diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/InaccessibleMethodException.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/InaccessibleMethodException.java deleted file mode 100644 index de018de82f47e879f7d4e1ecad5fd75abe527990..0000000000000000000000000000000000000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/InaccessibleMethodException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph; - -/** - * The exception that gets thrown when the degree option or the number of vertices - * option in {@link org.apache.flink.graph.spargel.IterationConfiguration} was not set. - */ -public class InaccessibleMethodException extends Exception { - - public InaccessibleMethodException() {} - - public InaccessibleMethodException(String text) { - super(text); - } -} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java deleted file mode 100644 index b692475426c763e32f047b809a5158500be6bd0a..0000000000000000000000000000000000000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph; - -import java.io.Serializable; - -/** - * Represents the graph's nodes. It carries an ID and a value as well as the vertex inDegree and outDegree. - * For vertices with no value, use {@link org.apache.flink.types.NullValue} as the value type. - * - * @param - * @param - */ -public class VertexWithDegrees & Serializable, V extends Serializable> - extends Vertex { - - private long inDegree; - - private long outDegree; - - public VertexWithDegrees() { - super(); - inDegree = -1l; - outDegree = -1l; - } - - public VertexWithDegrees(K k, V v) { - super(k,v); - inDegree = 0l; - outDegree = 0l; - } - - public Long getInDegree() throws Exception{ - if(inDegree == -1) { - throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " + - "call iterationConfiguration.setOptDegrees(true)."); - } - return inDegree; - } - - public void setInDegree(Long inDegree) { - this.inDegree = inDegree; - } - - public Long getOutDegree() throws Exception{ - if(outDegree == -1) { - throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " + - "call iterationConfiguration.setOptDegrees(true)."); - } - return outDegree; - } - - public void setOutDegree(Long outDegree) { - this.outDegree = outDegree; - } -} diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java index ec43207efcb4ad18c809379566b5b6238395c7c2..365518c1c680c01cd7b4a6c7139f1c727feec375 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java @@ -26,11 +26,10 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.VertexWithDegrees; import org.apache.flink.graph.example.utils.IncrementalSSSPData; -import org.apache.flink.graph.spargel.IterationConfiguration; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; +import org.apache.flink.graph.spargel.VertexCentricConfiguration; import org.apache.flink.graph.spargel.VertexUpdateFunction; import org.apache.flink.graph.utils.Tuple2ToVertexMap; import org.apache.flink.graph.utils.Tuple3ToEdgeMap; @@ -95,7 +94,7 @@ public class IncrementalSSSPExample implements ProgramDescription { graph.removeEdge(edgeToBeRemoved); // configure the iteration - IterationConfiguration parameters = new IterationConfiguration(); + VertexCentricConfiguration parameters = new VertexCentricConfiguration(); if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) { @@ -160,7 +159,7 @@ public class IncrementalSSSPExample implements ProgramDescription { @Override public void updateVertex(Vertex vertex, MessageIterator inMessages) throws Exception { if (inMessages.hasNext()) { - Long outDegree = ((VertexWithDegrees)vertex).getOutDegree() - 1; + Long outDegree = getOutDegree() - 1; // check if the vertex has another SP-Edge if (outDegree > 0) { // there is another shortest path from the source to this vertex diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java index a2ba2ac7bd4d6a73755d51a4ceb12006f0bdbf56..7b536e5e27757ae110eb5f095b99f357988272d0 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java @@ -20,6 +20,7 @@ package org.apache.flink.graph.library; import org.apache.flink.graph.Graph; import org.apache.flink.graph.GraphAlgorithm; +import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexUpdateFunction; @@ -60,7 +61,7 @@ public class ConnectedComponentsAlgorithm implements GraphAlgorithm { @Override - public void updateVertex(Long id, Long currentMin, MessageIterator messages) throws Exception { + public void updateVertex(Vertex vertex, MessageIterator messages) throws Exception { long min = Long.MAX_VALUE; for (long msg : messages) { @@ -68,7 +69,7 @@ public class ConnectedComponentsAlgorithm implements GraphAlgorithm { @Override - public void sendMessages(Long id, Long currentMin) throws Exception { + public void sendMessages(Vertex vertex) throws Exception { // send current minimum to neighbors - sendMessageToAllNeighbors(currentMin); + sendMessageToAllNeighbors(vertex.getValue()); } } } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java index b3092fd01b7944434d1e4a912d8124464c9a855b..4245c245838bca6943aa55a7b9e15a082be6aeef 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java @@ -26,24 +26,21 @@ import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.InaccessibleMethodException; import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.VertexWithDegrees; import org.apache.flink.types.Value; import org.apache.flink.util.Collector; /** * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}. * - * @param The type of the vertex key (the vertex identifier). - * @param The type of the vertex value (the state of the vertex). + * @param The type of the vertex key (the vertex identifier). + * @param The type of the vertex value (the state of the vertex). * @param The type of the message sent between vertices along the edges. - * @param The type of the values that are associated with the edges. + * @param The type of the values that are associated with the edges. */ -public abstract class MessagingFunction implements Serializable { +public abstract class MessagingFunction implements Serializable { private static final long serialVersionUID = 1L; @@ -54,11 +51,12 @@ public abstract class MessagingFunction vertex) throws Exception; + public abstract void sendMessages(Vertex vertex) throws Exception; /** * This method is executed one per superstep before the vertex update function is invoked for each vertex. @@ -117,12 +119,12 @@ public abstract class MessagingFunction> getEdges() { + public Iterable> getEdges() { if (edgesUsed) { throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once."); } edgesUsed = true; - this.edgeIterator.set((Iterator>) edges); + this.edgeIterator.set((Iterator>) edges); return this.edgeIterator; } @@ -143,7 +145,7 @@ public abstract class MessagingFunction outValue; + private Tuple2 outValue; private IterationRuntimeContext runtimeContext; private Iterator edges; - private Collector> out; + private Collector> out; - private EdgesIterator edgeIterator; + private EdgesIterator edgeIterator; private boolean edgesUsed; - + + private long inDegree = -1; + + private long outDegree = -1; void init(IterationRuntimeContext context) { this.runtimeContext = context; - this.outValue = new Tuple2(); - this.edgeIterator = new EdgesIterator(); + this.outValue = new Tuple2(); + this.edgeIterator = new EdgesIterator(); } - void set(Iterator edges, Collector> out) { + void set(Iterator edges, Collector> out) { this.edges = edges; this.out = out; this.edgesUsed = false; } - private static final class EdgesIterator - implements Iterator>, Iterable> + private static final class EdgesIterator + implements Iterator>, Iterable> { - private Iterator> input; + private Iterator> input; - private Edge edge = new Edge(); + private Edge edge = new Edge(); - void set(Iterator> input) { + void set(Iterator> input) { this.input = input; } @@ -252,8 +257,8 @@ public abstract class MessagingFunction next() { - Edge next = input.next(); + public Edge next() { + Edge next = input.next(); edge.setSource(next.f0); edge.setTarget(next.f1); edge.setValue(next.f2); @@ -265,28 +270,34 @@ public abstract class MessagingFunction> iterator() { + public Iterator> iterator() { return this; } } /** - * In order to hide the Tuple3(actualValue, inDegree, outDegree) vertex value from the user, - * another function will be called from {@link org.apache.flink.graph.spargel.VertexCentricIteration}. - * - * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling - * the regular sendMessages function. - * - * @param newVertexState - * @throws Exception + * Retrieves the vertex in-degree (number of in-coming edges). + * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)} + * option has been set; -1 otherwise. */ - void sendMessagesFromVertexCentricIteration(Vertex> newVertexState) - throws Exception { - VertexWithDegrees vertex = new VertexWithDegrees(newVertexState.getId(), - newVertexState.getValue().f0); - vertex.setInDegree(newVertexState.getValue().f1); - vertex.setOutDegree(newVertexState.getValue().f2); + public long getInDegree() { + return inDegree; + } + + void setInDegree(long inDegree) { + this.inDegree = inDegree; + } + + /** + * Retrieve the vertex out-degree (number of out-going edges). + * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)} + * option has been set; -1 otherwise. + */ + public long getOutDegree() { + return outDegree; + } - sendMessages(vertex); + void setOutDegree(long outDegree) { + this.outDegree = outDegree; } } diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java index 915a3ee17dc489c47243744d298db327160b8c9e..f8e64b6cc48ffc660fcfbde6f58f0787bc673aaf 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java @@ -29,10 +29,8 @@ import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.common.functions.RichCoGroupFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.operators.CoGroupOperator; import org.apache.flink.api.java.operators.CustomUnaryOperation; -import org.apache.flink.api.java.operators.DeltaIteration; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; @@ -43,7 +41,6 @@ import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.VertexWithDegrees; import org.apache.flink.util.Collector; import com.google.common.base.Preconditions; @@ -72,34 +69,33 @@ import com.google.common.base.Preconditions; * Vertex-centric graph iterations are are run by calling * {@link Graph#runVertexCentricIteration(VertexUpdateFunction, MessagingFunction, int)}. * - * @param The type of the vertex key (the vertex identifier). - * @param The type of the vertex value (the state of the vertex). + * @param The type of the vertex key (the vertex identifier). + * @param The type of the vertex value (the state of the vertex). * @param The type of the message sent between vertices along the edges. - * @param The type of the values that are associated with the edges. + * @param The type of the values that are associated with the edges. */ -public class VertexCentricIteration - implements CustomUnaryOperation, Vertex> +public class VertexCentricIteration + implements CustomUnaryOperation, Vertex> { - private final VertexUpdateFunction updateFunction; + private final VertexUpdateFunction updateFunction; - private final MessagingFunction messagingFunction; + private final MessagingFunction messagingFunction; - private final DataSet> edgesWithValue; + private final DataSet> edgesWithValue; private final int maximumNumberOfIterations; private final TypeInformation messageType; - private DataSet> initialVertices; + private DataSet> initialVertices; private VertexCentricConfiguration configuration; - private DataSet>> verticesWithDegrees; // ---------------------------------------------------------------------------------- - private VertexCentricIteration(VertexUpdateFunction uf, - MessagingFunction mf, - DataSet> edgesWithValue, + private VertexCentricIteration(VertexUpdateFunction uf, + MessagingFunction mf, + DataSet> edgesWithValue, int maximumNumberOfIterations) { Preconditions.checkNotNull(uf); @@ -114,7 +110,7 @@ public class VertexCentricIteration this.messageType = getMessageType(mf); } - private TypeInformation getMessageType(MessagingFunction mf) { + private TypeInformation getMessageType(MessagingFunction mf) { return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null); } @@ -132,7 +128,7 @@ public class VertexCentricIteration * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet) */ @Override - public void setInput(DataSet> inputData) { + public void setInput(DataSet> inputData) { this.initialVertices = inputData; } @@ -142,17 +138,17 @@ public class VertexCentricIteration * @return The operator that represents this vertex-centric graph computation. */ @Override - public DataSet> createResult() { + public DataSet> createResult() { if (this.initialVertices == null) { throw new IllegalStateException("The input data set has not been set."); } // prepare some type information - TypeInformation keyType = ((TupleTypeInfo) initialVertices.getType()).getTypeAt(0); - TypeInformation> messageTypeInfo = new TupleTypeInfo>(keyType, messageType); + TypeInformation keyType = ((TupleTypeInfo) initialVertices.getType()).getTypeAt(0); + TypeInformation> messageTypeInfo = new TupleTypeInfo>(keyType, messageType); // create a graph - Graph graph = + Graph graph = Graph.fromDataSet(initialVertices, edgesWithValue, ExecutionEnvironment.getExecutionEnvironment()); // check whether the numVertices option is set and, if so, compute the total number of vertices @@ -194,21 +190,21 @@ public class VertexCentricIteration * @param uf The function that updates the state of the vertices from the incoming messages. * @param mf The function that turns changed vertex states into messages along the edges. * - * @param The type of the vertex key (the vertex identifier). - * @param The type of the vertex value (the state of the vertex). + * @param The type of the vertex key (the vertex identifier). + * @param The type of the vertex value (the state of the vertex). * @param The type of the message sent between vertices along the edges. - * @param The type of the values that are associated with the edges. + * @param The type of the values that are associated with the edges. * * @return An in stance of the vertex-centric graph computation operator. */ - public static final - VertexCentricIteration withEdges( - DataSet> edgesWithValue, - VertexUpdateFunction uf, - MessagingFunction mf, + public static final + VertexCentricIteration withEdges( + DataSet> edgesWithValue, + VertexUpdateFunction uf, + MessagingFunction mf, int maximumNumberOfIterations) { - return new VertexCentricIteration(uf, mf, edgesWithValue, maximumNumberOfIterations); + return new VertexCentricIteration(uf, mf, edgesWithValue, maximumNumberOfIterations); } /** @@ -231,21 +227,21 @@ public class VertexCentricIteration // Wrapping UDFs // -------------------------------------------------------------------------------------------- - private static final class VertexUpdateUdf - extends RichCoGroupFunction, Vertex, Vertex> - implements ResultTypeQueryable> + private static abstract class VertexUpdateUdf extends RichCoGroupFunction< + Tuple2, Vertex, Vertex> + implements ResultTypeQueryable> { private static final long serialVersionUID = 1L; - final VertexUpdateFunction vertexUpdateFunction; + final VertexUpdateFunction vertexUpdateFunction; final MessageIterator messageIter = new MessageIterator(); - private transient TypeInformation> resultType; + private transient TypeInformation> resultType; - private VertexUpdateUdf(VertexUpdateFunction vertexUpdateFunction, - TypeInformation> resultType) + private VertexUpdateUdf(VertexUpdateFunction vertexUpdateFunction, + TypeInformation> resultType) { this.vertexUpdateFunction = vertexUpdateFunction; this.resultType = resultType; @@ -265,27 +261,26 @@ public class VertexCentricIteration } @Override - public TypeInformation> getProducedType() { + public TypeInformation> getProducedType() { return this.resultType; } } - private static final class VertexUpdateUdfSimpleVertexValue - extends VertexUpdateUdf { + @SuppressWarnings("serial") + private static final class VertexUpdateUdfSimpleVV extends VertexUpdateUdf { - - private VertexUpdateUdfSimpleVertexValue(VertexUpdateFunction vertexUpdateFunction, TypeInformation> resultType) { + private VertexUpdateUdfSimpleVV(VertexUpdateFunction vertexUpdateFunction, TypeInformation> resultType) { super(vertexUpdateFunction, resultType); } @Override - public void coGroup(Iterable> messages, - Iterable> vertex, - Collector> out) throws Exception { - final Iterator> vertexIter = vertex.iterator(); + public void coGroup(Iterable> messages, + Iterable> vertex, + Collector> out) throws Exception { + final Iterator> vertexIter = vertex.iterator(); if (vertexIter.hasNext()) { - Vertex vertexState = vertexIter.next(); + Vertex vertexState = vertexIter.next(); @SuppressWarnings("unchecked") Iterator> downcastIter = (Iterator>) (Iterator) messages.iterator(); @@ -295,11 +290,11 @@ public class VertexCentricIteration vertexUpdateFunction.updateVertex(vertexState, messageIter); } else { - final Iterator> messageIter = messages.iterator(); + final Iterator> messageIter = messages.iterator(); if (messageIter.hasNext()) { String message = "Target vertex does not exist!."; try { - Tuple2 next = messageIter.next(); + Tuple2 next = messageIter.next(); message = "Target vertex '" + next.f0 + "' does not exist!."; } catch (Throwable t) {} throw new Exception(message); @@ -310,36 +305,39 @@ public class VertexCentricIteration } } - private static final class VertexUpdateUdfVertexValueWithDegrees extends VertexUpdateUdf, VertexValue, Message> { - + @SuppressWarnings("serial") + private static final class VertexUpdateUdfVVWithDegrees extends VertexUpdateUdf, Message> { - private VertexUpdateUdfVertexValueWithDegrees(VertexUpdateFunction vertexUpdateFunction, TypeInformation>> resultType) { + private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction, Message> vertexUpdateFunction, + TypeInformation>> resultType) { super(vertexUpdateFunction, resultType); } - + @Override - public void coGroup(Iterable> messages, - Iterable>> vertex, - Collector>> out) throws Exception { - final Iterator>> vertexIter = vertex.iterator(); + public void coGroup(Iterable> messages, Iterable>> vertex, + Collector>> out) throws Exception { + final Iterator>> vertexIter = vertex.iterator(); + if (vertexIter.hasNext()) { - Vertex> vertexState = vertexIter.next(); - + Vertex> vertexWithDegrees = vertexIter.next(); + @SuppressWarnings("unchecked") Iterator> downcastIter = (Iterator>) (Iterator) messages.iterator(); messageIter.setSource(downcastIter); - vertexUpdateFunction.setOutputWithDegrees(vertexState, out); - vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexState, messageIter); + vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1); + vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2); + + vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out); + vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexWithDegrees, messageIter); } else { - final Iterator> messageIter = messages.iterator(); + final Iterator> messageIter = messages.iterator(); if (messageIter.hasNext()) { String message = "Target vertex does not exist!."; try { - Tuple2 next = messageIter.next(); + Tuple2 next = messageIter.next(); message = "Target vertex '" + next.f0 + "' does not exist!."; } catch (Throwable t) {} throw new Exception(message); @@ -353,19 +351,19 @@ public class VertexCentricIteration /* * UDF that encapsulates the message sending function for graphs where the edges have an associated value. */ - private static final class MessagingUdfWithEdgeValues - extends RichCoGroupFunction, Vertex, Tuple2> - implements ResultTypeQueryable> + private static abstract class MessagingUdfWithEdgeValues + extends RichCoGroupFunction, Vertex, Tuple2> + implements ResultTypeQueryable> { private static final long serialVersionUID = 1L; - final MessagingFunction messagingFunction; + final MessagingFunction messagingFunction; - private transient TypeInformation> resultType; - - - private MessagingUdfWithEdgeValues(MessagingFunction messagingFunction, - TypeInformation> resultType) + private transient TypeInformation> resultType; + + + private MessagingUdfWithEdgeValues(MessagingFunction messagingFunction, + TypeInformation> resultType) { this.messagingFunction = messagingFunction; this.resultType = resultType; @@ -386,54 +384,62 @@ public class VertexCentricIteration } @Override - public TypeInformation> getProducedType() { + public TypeInformation> getProducedType() { return this.resultType; } } - private static final class MessagingUdfWithEdgeValuesSimpleVertexValue - extends MessagingUdfWithEdgeValues { + @SuppressWarnings("serial") + private static final class MessagingUdfWithEVsSimpleVV + extends MessagingUdfWithEdgeValues { - private MessagingUdfWithEdgeValuesSimpleVertexValue(MessagingFunction messagingFunction, - TypeInformation> resultType) { + private MessagingUdfWithEVsSimpleVV(MessagingFunction messagingFunction, + TypeInformation> resultType) { super(messagingFunction, resultType); } @Override - public void coGroup(Iterable> edges, - Iterable> state, - Collector> out) throws Exception { - final Iterator> stateIter = state.iterator(); - + public void coGroup(Iterable> edges, + Iterable> state, + Collector> out) throws Exception { + final Iterator> stateIter = state.iterator(); + if (stateIter.hasNext()) { - Vertex newVertexState = stateIter.next(); + Vertex newVertexState = stateIter.next(); messagingFunction.set((Iterator) edges.iterator(), out); messagingFunction.sendMessages(newVertexState); } } } - private static final class MessagingUdfWithEdgeValuesVertexValueWithDegrees - extends MessagingUdfWithEdgeValues, VertexValue, Message, EdgeValue> { + @SuppressWarnings("serial") + private static final class MessagingUdfWithEVsVVWithDegrees + extends MessagingUdfWithEdgeValues, VV, Message, EV> { + private Vertex nextVertex = new Vertex(); - private MessagingUdfWithEdgeValuesVertexValueWithDegrees - (MessagingFunction messagingFunction, - TypeInformation> resultType) { + private MessagingUdfWithEVsVVWithDegrees(MessagingFunction messagingFunction, + TypeInformation> resultType) { super(messagingFunction, resultType); } @Override - public void coGroup(Iterable> edges, - Iterable>> state, - Collector> out) throws Exception { - - final Iterator>> stateIter = state.iterator(); + public void coGroup(Iterable> edges, Iterable>> state, + Collector> out) throws Exception { + final Iterator>> stateIter = state.iterator(); + if (stateIter.hasNext()) { - Vertex> newVertexState = stateIter.next(); + Vertex> vertexWithDegrees = stateIter.next(); + + nextVertex.setField(vertexWithDegrees.f0, 0); + nextVertex.setField(vertexWithDegrees.f1.f0, 1); + + messagingFunction.setInDegree(vertexWithDegrees.f1.f1); + messagingFunction.setOutDegree(vertexWithDegrees.f1.f2); + messagingFunction.set((Iterator) edges.iterator(), out); - messagingFunction.sendMessagesFromVertexCentricIteration(newVertexState); + messagingFunction.sendMessages(nextVertex); } } } @@ -454,15 +460,14 @@ public class VertexCentricIteration * @param equalToArg the argument for the equalTo within the coGroup * @return the messaging function */ - private CoGroupOperator> buildMessagingFunction( - DeltaIteration, Vertex> iteration, - TypeInformation> messageTypeInfo, int whereArg, int equalToArg) { + private CoGroupOperator> buildMessagingFunction( + DeltaIteration, Vertex> iteration, + TypeInformation> messageTypeInfo, int whereArg, int equalToArg) { // build the messaging function (co group) - CoGroupOperator> messages; - MessagingUdfWithEdgeValues messenger = - new MessagingUdfWithEdgeValuesSimpleVertexValue( - messagingFunction, messageTypeInfo); + CoGroupOperator> messages; + MessagingUdfWithEdgeValues messenger = + new MessagingUdfWithEVsSimpleVV(messagingFunction, messageTypeInfo); messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg) .equalTo(equalToArg).with(messenger); @@ -489,16 +494,14 @@ public class VertexCentricIteration * @param equalToArg the argument for the equalTo within the coGroup * @return the messaging function */ - private CoGroupOperator> buildMessagingFunctionVerticesWithDegrees( - DeltaIteration>, - Vertex>> iteration, - TypeInformation> messageTypeInfo, int whereArg, int equalToArg) { + private CoGroupOperator> buildMessagingFunctionVerticesWithDegrees( + DeltaIteration>, Vertex>> iteration, + TypeInformation> messageTypeInfo, int whereArg, int equalToArg) { // build the messaging function (co group) - CoGroupOperator> messages; - MessagingUdfWithEdgeValues, VertexValue, Message, EdgeValue> messenger = - new MessagingUdfWithEdgeValuesVertexValueWithDegrees( - messagingFunction, messageTypeInfo); + CoGroupOperator> messages; + MessagingUdfWithEdgeValues, VV, Message, EV> messenger = + new MessagingUdfWithEVsVVWithDegrees(messagingFunction, messageTypeInfo); messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg) .equalTo(equalToArg).with(messenger); @@ -518,17 +521,10 @@ public class VertexCentricIteration /** * Helper method which sets up an iteration with the given vertex value(either simple or with degrees) * - * @param vertices - * @param + * @param iteration */ - private DeltaIteration, Vertex> setUpIteration( - DataSet> vertices) { - - final int[] zeroKeyPos = new int[] {0}; - - final DeltaIteration, Vertex> iteration = - vertices.iterateDelta(vertices, this.maximumNumberOfIterations, zeroKeyPos); + private void setUpIteration(DeltaIteration iteration) { // set up the iteration operator if (this.configuration != null) { @@ -546,8 +542,6 @@ public class VertexCentricIteration // no configuration provided; set default name iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"); } - - return iteration; } /** @@ -557,14 +551,16 @@ public class VertexCentricIteration * @param messageTypeInfo * @return the operator */ - private DataSet> createResultSimpleVertex(EdgeDirection messagingDirection, - TypeInformation> messageTypeInfo) { - DataSet> messages; + private DataSet> createResultSimpleVertex(EdgeDirection messagingDirection, + TypeInformation> messageTypeInfo) { + + DataSet> messages; - TypeInformation> vertexTypes = initialVertices.getType(); + TypeInformation> vertexTypes = initialVertices.getType(); - final DeltaIteration, Vertex> iteration = - setUpIteration(this.initialVertices); + final DeltaIteration, Vertex> iteration = + initialVertices.iterateDelta(initialVertices, this.maximumNumberOfIterations, 0); + setUpIteration(iteration); switch (messagingDirection) { case IN: @@ -581,11 +577,11 @@ public class VertexCentricIteration throw new IllegalArgumentException("Illegal edge direction"); } - VertexUpdateUdf updateUdf = - new VertexUpdateUdfSimpleVertexValue(updateFunction, vertexTypes); + VertexUpdateUdf updateUdf = + new VertexUpdateUdfSimpleVV(updateFunction, vertexTypes); // build the update function (co group) - CoGroupOperator> updates = + CoGroupOperator> updates = messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf); configureUpdateFunction(updates); @@ -602,46 +598,44 @@ public class VertexCentricIteration * @param messageTypeInfo * @return the operator */ - private DataSet> createResultVerticesWithDegrees( - Graph graph, - EdgeDirection messagingDirection, - TypeInformation> messageTypeInfo) { + @SuppressWarnings("serial") + private DataSet> createResultVerticesWithDegrees(Graph graph, EdgeDirection messagingDirection, + TypeInformation> messageTypeInfo) { - DataSet> messages; + DataSet> messages; this.updateFunction.setOptDegrees(this.configuration.isOptDegrees()); - DataSet> inDegrees = graph.inDegrees(); - DataSet> outDegrees = graph.outDegrees(); + DataSet> inDegrees = graph.inDegrees(); + DataSet> outDegrees = graph.outDegrees(); - DataSet> degrees = inDegrees.join(outDegrees).where(0).equalTo(0) - .with(new FlatJoinFunction, Tuple2, Tuple3>() { + DataSet> degrees = inDegrees.join(outDegrees).where(0).equalTo(0) + .with(new FlatJoinFunction, Tuple2, Tuple3>() { @Override - public void join(Tuple2 first, Tuple2 second, Collector> out) throws Exception { - out.collect(new Tuple3(first.f0, first.f1, second.f1)); + public void join(Tuple2 first, Tuple2 second, Collector> out) { + out.collect(new Tuple3(first.f0, first.f1, second.f1)); } - }); + }).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1"); - DataSet>> verticesWithDegrees= initialVertices + DataSet>> verticesWithDegrees = initialVertices .join(degrees).where(0).equalTo(0) - .with(new FlatJoinFunction, Tuple3, Vertex>>() { + .with(new FlatJoinFunction, Tuple3, Vertex>>() { @Override - public void join(Vertex vertex, - Tuple3 degrees, - Collector>> out) throws Exception { + public void join(Vertex vertex, Tuple3 degrees, + Collector>> out) throws Exception { - out.collect(new VertexWithDegrees>(vertex.getId(), - new Tuple3(vertex.getValue(), degrees.f1, degrees.f2))); + out.collect(new Vertex>(vertex.getId(), + new Tuple3(vertex.getValue(), degrees.f1, degrees.f2))); } - }); + }).withForwardedFieldsFirst("f0"); // add type info - TypeInformation>> vertexTypes = verticesWithDegrees.getType(); + TypeInformation>> vertexTypes = verticesWithDegrees.getType(); - final DeltaIteration>, - Vertex>> iteration = - setUpIteration(verticesWithDegrees); + final DeltaIteration>, Vertex>> iteration = + verticesWithDegrees.iterateDelta(verticesWithDegrees, this.maximumNumberOfIterations, 0); + setUpIteration(iteration); switch (messagingDirection) { case IN: @@ -658,24 +652,26 @@ public class VertexCentricIteration throw new IllegalArgumentException("Illegal edge direction"); } - VertexUpdateUdf, VertexValue, Message> updateUdf = - new VertexUpdateUdfVertexValueWithDegrees(updateFunction, vertexTypes); + @SuppressWarnings({ "unchecked", "rawtypes" }) + VertexUpdateUdf, Message> updateUdf = + new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes); // build the update function (co group) - CoGroupOperator>> updates = + CoGroupOperator>> updates = messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf); configureUpdateFunction(updates); - return iteration.closeWith(updates, updates).map(new MapFunction>, Vertex>() { - @Override - public Vertex map(Vertex> vertex) throws Exception { - return new Vertex(vertex.getId(), vertex.getValue().f0); - } - }); + return iteration.closeWith(updates, updates).map( + new MapFunction>, Vertex>() { + + public Vertex map(Vertex> vertex) { + return new Vertex(vertex.getId(), vertex.getValue().f0); + } + }); } - private void configureUpdateFunction(CoGroupOperator> updates) { + private void configureUpdateFunction(CoGroupOperator> updates) { // configure coGroup update function with name and broadcast variables updates = updates.name("Vertex State Updates"); @@ -688,4 +684,4 @@ public class VertexCentricIteration // let the operator know that we preserve the key field updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0"); } -} +} \ No newline at end of file diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java index bc2e857f0df167fad4e5c2dc9b1812ec4eb14339..9930b50de7ad1b258d119891b5140a329ba11b90 100644 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java +++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java @@ -24,9 +24,7 @@ import java.util.Collection; import org.apache.flink.api.common.aggregators.Aggregator; import org.apache.flink.api.common.functions.IterationRuntimeContext; import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.graph.InaccessibleMethodException; import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.VertexWithDegrees; import org.apache.flink.types.Value; import org.apache.flink.util.Collector; @@ -35,11 +33,11 @@ import org.apache.flink.util.Collector; * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is * invoked once per vertex per superstep. * - * The vertex key type. - * The vertex value type. + * The vertex key type. + * The vertex value type. * The message type. */ -public abstract class VertexUpdateFunction implements Serializable { +public abstract class VertexUpdateFunction implements Serializable { private static final long serialVersionUID = 1L; @@ -50,11 +48,12 @@ public abstract class VertexUpdateFunction impl private long numberOfVertices = -1L; - public long getNumberOfVertices() throws Exception{ - if (numberOfVertices == -1) { - throw new InaccessibleMethodException("The number of vertices option is not set. " + - "To access the number of vertices, call iterationConfiguration.setOptNumVertices(true)."); - } + /** + * Retrieves the number of vertices in the graph. + * @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)} + * option has been set; -1 otherwise. + */ + public long getNumberOfVertices() { return numberOfVertices; } @@ -66,7 +65,7 @@ public abstract class VertexUpdateFunction impl private boolean optDegrees; - public boolean isOptDegrees() { + boolean isOptDegrees() { return optDegrees; } @@ -80,7 +79,7 @@ public abstract class VertexUpdateFunction impl /** * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as - * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex + * the incoming messages. It may set a new vertex state via {@link #setNewVV(Object)}. If the vertex * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}. * * @param vertex The vertex. @@ -88,7 +87,7 @@ public abstract class VertexUpdateFunction impl * * @throws Exception The computation may throw exceptions, which causes the superstep to fail. */ - public abstract void updateVertex(Vertex vertex, MessageIterator inMessages) throws Exception; + public abstract void updateVertex(Vertex vertex, MessageIterator inMessages) throws Exception; /** * This method is executed one per superstep before the vertex update function is invoked for each vertex. @@ -109,7 +108,7 @@ public abstract class VertexUpdateFunction impl * * @param newValue The new vertex value. */ - public void setNewVertexValue(VertexValue newValue) { + public void setNewVertexValue(VV newValue) { if(isOptDegrees()) { outValWithDegrees.f1.f0 = newValue; outWithDegrees.collect(outValWithDegrees); @@ -167,30 +166,58 @@ public abstract class VertexUpdateFunction impl private IterationRuntimeContext runtimeContext; - private Collector> out; + private Collector> out; - private Collector>> outWithDegrees; + private Collector>> outWithDegrees; - private Vertex outVal; + private Vertex outVal; - private Vertex> outValWithDegrees; + private Vertex> outValWithDegrees; + private long inDegree = -1; + + private long outDegree = -1; void init(IterationRuntimeContext context) { this.runtimeContext = context; } + void setOutput(Vertex outVal, Collector> out) { + this.outVal = outVal; + this.out = out; + } + @SuppressWarnings({ "unchecked", "rawtypes" }) + void setOutputWithDegrees(Vertex outVal, + Collector out) { + this.outValWithDegrees = (Vertex>) outVal; + this.outWithDegrees = out; + } - void setOutputWithDegrees(Vertex> outValWithDegrees, - Collector>> outWithDegrees) { - this.outValWithDegrees = outValWithDegrees; - this.outWithDegrees = outWithDegrees; + /** + * Retrieves the vertex in-degree (number of in-coming edges). + * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)} + * option has been set; -1 otherwise. + */ + public long getInDegree() { + return inDegree; } - void setOutput(Vertex outVal, Collector> out) { - this.outVal = outVal; - this.out = out; + void setInDegree(long inDegree) { + this.inDegree = inDegree; + } + + /** + * Retrieve the vertex out-degree (number of out-going edges). + * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)} + * option has been set; -1 otherwise. + */ + public long getOutDegree() { + return outDegree; + } + + void setOutDegree(long outDegree) { + this.outDegree = outDegree; } /** @@ -204,12 +231,12 @@ public abstract class VertexUpdateFunction impl * @param inMessages * @throws Exception */ - void updateVertexFromVertexCentricIteration(Vertex> vertexState, + @SuppressWarnings("unchecked") + void updateVertexFromVertexCentricIteration(Vertex vertexState, MessageIterator inMessages) throws Exception { - VertexWithDegrees vertex = new VertexWithDegrees(vertexState.getId(), - vertexState.getValue().f0); - vertex.setInDegree(vertexState.getValue().f1); - vertex.setOutDegree(vertexState.getValue().f2); + + Vertex vertex = new Vertex(vertexState.f0, + ((Tuple3)vertexState.getValue()).f0); updateVertex(vertex, inMessages); } diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java index e93f581d5a39e2bb6462cd046174d69890e6ae5c..3fbd0bc2cf702b38d615c894e491523d2880d1b4 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.spargel.MessageIterator; diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java index 5f5f8b29d562fac9a2008e85cce4f3346e205b15..ca5d5d968e7dbcaa4c694c6f67d14951847d29ae 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java @@ -31,7 +31,6 @@ import org.apache.flink.graph.gsa.GatherFunction; import org.apache.flink.graph.gsa.GatherSumApplyIteration; import org.apache.flink.graph.gsa.Neighbor; import org.apache.flink.graph.gsa.SumFunction; -import org.apache.flink.graph.IterationConfiguration; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.apache.flink.types.LongValue; import org.junit.After; diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java index 1c3690670134ebe4c54284fb40703ff151974c4c..567c01501fe268b88519fcb665c94c51d95bf448 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java @@ -26,13 +26,10 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; import org.apache.flink.graph.EdgeDirection; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.IterationConfiguration; import org.apache.flink.graph.spargel.MessageIterator; import org.apache.flink.graph.spargel.MessagingFunction; import org.apache.flink.graph.spargel.VertexCentricConfiguration; @@ -48,6 +45,7 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.apache.flink.graph.utils.VertexToTuple2Map; @RunWith(Parameterized.class) public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { @@ -88,6 +86,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { parameters.addBroadcastSetForUpdateFunction("updateBcastSet", env.fromElements(1, 2, 3)); parameters.addBroadcastSetForMessagingFunction("messagingBcastSet", env.fromElements(4, 5, 6)); parameters.registerAggregator("superstepAggregator", new LongSumAggregator()); + parameters.setOptNumVertices(true); Graph result = graph.runVertexCentricIteration( new UpdateFunction(), new MessageFunction(), 10, parameters); @@ -135,6 +134,29 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { "5 15"; } + @Test + public void testDefaultConfiguration() throws Exception { + /* + * Test Graph's runVertexCentricIteration when configuration parameters are not provided + * i.e. degrees and numVertices will be -1, EdgeDirection will be OUT. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), + TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper()); + + Graph result = graph.runVertexCentricIteration( + new UpdateFunctionDefault(), new MessageFunctionDefault(), 5); + + result.getVertices().map(new VertexToTuple2Map()).writeAsCsv(resultPath, "\n", "\t"); + env.execute(); + expectedResult = "1 6\n" + + "2 6\n" + + "3 6\n" + + "4 6\n" + + "5 6"; + } + @Test public void testIterationDefaultDirection() throws Exception { @@ -176,7 +198,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { .mapVertices(new InitialiseHashSetMapper()); // configure the iteration - IterationConfiguration parameters = new IterationConfiguration(); + VertexCentricConfiguration parameters = new VertexCentricConfiguration(); parameters.setDirection(EdgeDirection.IN); @@ -208,7 +230,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { .mapVertices(new InitialiseHashSetMapper()); // configure the iteration - IterationConfiguration parameters = new IterationConfiguration(); + VertexCentricConfiguration parameters = new VertexCentricConfiguration(); parameters.setDirection(EdgeDirection.ALL); @@ -226,12 +248,37 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { "5 [1, 3, 4]"; } + @Test + public void testNumVerticesNotSet() throws Exception { + + /* + * Test that if the number of vertices option is not set, -1 is returned as value. + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), + TestGraphUtils.getLongLongEdges(), env); + + DataSet> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(), + new DummyMessageFunction(), 2).getVertices(); + + verticesWithNumVertices.writeAsCsv(resultPath, "\n", "\t"); + env.execute(); + + expectedResult = "1 -1\n" + + "2 -1\n" + + "3 -1\n" + + "4 -1\n" + + "5 -1"; + } + @Test public void testInDegreesSet() throws Exception { /* - * Test that if the degrees are set, the in degrees can be accessed in every superstep and the value - * is correctly computed. + * Test that if the degrees are set, they can be accessed in every superstep + * inside the update function and the value + * is correctly computed for degrees in the messaging function. */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -239,14 +286,14 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { TestGraphUtils.getLongLongEdges(), env); // configure the iteration - IterationConfiguration parameters = new IterationConfiguration(); + VertexCentricConfiguration parameters = new VertexCentricConfiguration(); parameters.setOptDegrees(true); - DataSet> verticesWithInDegree = graph.runVertexCentricIteration(new UpdateFunctionInDegree(), - new DummyMessageFunction(), 5, parameters).getVertices(); + DataSet> verticesWithDegrees = graph.runVertexCentricIteration( + new UpdateFunctionInDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices(); - verticesWithInDegree.writeAsCsv(resultPath, "\n", "\t"); + verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t"); env.execute(); expectedResult = "1 1\n" + @@ -257,41 +304,36 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { } @Test - public void testOutDegreesSet() throws Exception { + public void testInDegreesNotSet() throws Exception { /* - * Test that if the degrees are set, the out degrees can be accessed in every superstep and the value - * is correctly computed. + * Test that if the degrees option is not set, then -1 is returned as a value for in-degree. */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env); - // configure the iteration - IterationConfiguration parameters = new IterationConfiguration(); - - parameters.setOptDegrees(true); + DataSet> verticesWithDegrees = graph.runVertexCentricIteration( + new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices(); - DataSet> verticesWithOutDegree = graph.runVertexCentricIteration(new UpdateFunctionOutDegree(), - new DummyMessageFunction(), 5, parameters).getVertices(); - - verticesWithOutDegree.writeAsCsv(resultPath, "\n", "\t"); + verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t"); env.execute(); - expectedResult = "1 2\n" + - "2 1\n" + - "3 2\n" + - "4 1\n" + - "5 1"; + expectedResult = "1 -1\n" + + "2 -1\n" + + "3 -1\n" + + "4 -1\n" + + "5 -1"; } @Test - public void testNumVerticesSet() throws Exception { + public void testOutDegreesSet() throws Exception { /* - * Test that if the number of vertices option is set, it can be accessed in every superstep and the value - * is correctly computed. + * Test that if the degrees are set, they can be accessed in every superstep + * inside the update function and the value + * is correctly computed for degrees in the messaging function. */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); @@ -299,56 +341,45 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { TestGraphUtils.getLongLongEdges(), env); // configure the iteration - IterationConfiguration parameters = new IterationConfiguration(); + VertexCentricConfiguration parameters = new VertexCentricConfiguration(); - parameters.setOptNumVertices(true); + parameters.setOptDegrees(true); - DataSet> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(), - new DummyMessageFunction(), 5, parameters).getVertices(); + DataSet> verticesWithDegrees = graph.runVertexCentricIteration( + new UpdateFunctionOutDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices(); - verticesWithNumVertices.writeAsCsv(resultPath, "\n", "\t"); + verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t"); env.execute(); - expectedResult = "1 5\n" + - "2 5\n" + - "3 5\n" + - "4 5\n" + - "5 5"; + expectedResult = "1 2\n" + + "2 1\n" + + "3 2\n" + + "4 1\n" + + "5 1"; } @Test - public void testDegrees() throws Exception { + public void testOutDegreesNotSet() throws Exception { /* - * Test that if the degrees are set, they can be accessed in every superstep and the value - * is correctly computed for both in and out degrees. + * Test that if the degrees option is not set, then -1 is returned as a value for out-degree. */ final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph, Long> graph = Graph.fromCollection(TestGraphUtils.getLongVerticesWithDegrees(), + Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), TestGraphUtils.getLongLongEdges(), env); - // configure the iteration - IterationConfiguration parameters = new IterationConfiguration(); - - parameters.setOptDegrees(true); - - DataSet>> verticesWithDegrees = graph.runVertexCentricIteration( - new UpdateFunctionDegrees(), new DegreeMessageFunction(), 5, parameters).getVertices(); + DataSet> verticesWithDegrees = graph.runVertexCentricIteration( + new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices(); - verticesWithDegrees.map(new MapFunction>, Tuple2>() { - @Override - public Tuple2 map(Vertex> vertex) throws Exception { - return new Tuple2(vertex.getId(), vertex.getValue().f2); - } - }).writeAsCsv(resultPath, "\n", "\t"); + verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t"); env.execute(); - expectedResult = "1 true\n" + - "2 true\n" + - "3 true\n" + - "4 true\n" + - "5 true"; + expectedResult = "1 -1\n" + + "2 -1\n" + + "3 -1\n" + + "4 -1\n" + + "5 -1"; } @Test @@ -364,7 +395,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { TestGraphUtils.getLongLongEdges(), env); // configure the iteration - IterationConfiguration parameters = new IterationConfiguration(); + VertexCentricConfiguration parameters = new VertexCentricConfiguration(); parameters.setOptDegrees(true); parameters.setDirection(EdgeDirection.ALL); @@ -399,12 +430,36 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { // test aggregator aggregator = getIterationAggregator("superstepAggregator"); + + // test number of vertices + Assert.assertEquals(5, getNumberOfVertices()); + } @Override public void updateVertex(Vertex vertex, MessageIterator inMessages) { long superstep = getSuperstepNumber(); aggregator.aggregate(superstep); + + setNewVertexValue(vertex.getValue() + 1); + } + } + + @SuppressWarnings("serial") + public static final class UpdateFunctionDefault extends VertexUpdateFunction { + + LongSumAggregator aggregator = new LongSumAggregator(); + + @Override + public void updateVertex(Vertex vertex, MessageIterator inMessages) { + + // test number of vertices + Assert.assertEquals(-1, getNumberOfVertices()); + + // test degrees + Assert.assertEquals(-1, getInDegree()); + Assert.assertEquals(-1, getOutDegree()); + setNewVertexValue(vertex.getValue() + 1); } } @@ -421,6 +476,9 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { Assert.assertEquals(4, bcastSet.get(0)); Assert.assertEquals(5, bcastSet.get(1)); Assert.assertEquals(6, bcastSet.get(2)); + + // test number of vertices + Assert.assertEquals(5, getNumberOfVertices()); // test aggregator if (getSuperstepNumber() == 2) { @@ -437,28 +495,18 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class UpdateFunctionInDegree extends VertexUpdateFunction { + public static final class MessageFunctionDefault extends MessagingFunction { @Override - public void updateVertex(Vertex vertex, MessageIterator inMessages) { - try { - setNewVertexValue(((VertexWithDegrees) vertex).getInDegree()); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - @SuppressWarnings("serial") - public static final class UpdateFunctionOutDegree extends VertexUpdateFunction { + public void sendMessages(Vertex vertex) { + // test number of vertices + Assert.assertEquals(-1, getNumberOfVertices()); - @Override - public void updateVertex(Vertex vertex, MessageIterator inMessages) { - try { - setNewVertexValue(((VertexWithDegrees) vertex).getOutDegree()); - } catch (Exception e) { - e.printStackTrace(); - } + // test degrees + Assert.assertEquals(-1, getInDegree()); + Assert.assertEquals(-1, getOutDegree()); + //send message to keep vertices active + sendMessageToAllNeighbors(vertex.getValue()); } } @@ -467,11 +515,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { @Override public void updateVertex(Vertex vertex, MessageIterator inMessages) { - try { setNewVertexValue(getNumberOfVertices()); - } catch (Exception e) { - e.printStackTrace(); - } } } @@ -495,8 +539,25 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { } @SuppressWarnings("serial") - public static final class VertexUpdateDirection extends VertexUpdateFunction, - Long> { + public static final class DegreesMessageFunction extends MessagingFunction { + + @Override + public void sendMessages(Vertex vertex) { + if (vertex.getId().equals(1)) { + Assert.assertEquals(2, getOutDegree()); + Assert.assertEquals(1, getInDegree()); + } + else if(vertex.getId().equals(3)) { + Assert.assertEquals(2, getOutDegree()); + Assert.assertEquals(2, getInDegree()); + } + //send message to keep vertices active + sendMessageToAllNeighbors(vertex.getValue()); + } + } + + @SuppressWarnings("serial") + public static final class VertexUpdateDirection extends VertexUpdateFunction, Long> { @Override public void updateVertex(Vertex> vertex, MessageIterator messages) throws Exception { @@ -510,6 +571,26 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { } } + @SuppressWarnings("serial") + public static final class UpdateFunctionInDegrees extends VertexUpdateFunction { + + @Override + public void updateVertex(Vertex vertex, MessageIterator inMessages) { + long inDegree = getInDegree(); + setNewVertexValue(inDegree); + } + } + + @SuppressWarnings("serial") + public static final class UpdateFunctionOutDegrees extends VertexUpdateFunction { + + @Override + public void updateVertex(Vertex vertex, MessageIterator inMessages) { + long outDegree = getOutDegree(); + setNewVertexValue(outDegree); + } + } + @SuppressWarnings("serial") public static final class VertexUpdateNumNeighbors extends VertexUpdateFunction { @@ -519,25 +600,21 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { long count = 0; - for(long msg : messages) { + for(@SuppressWarnings("unused") long msg : messages) { count++; } - - setNewVertexValue(count == (((VertexWithDegrees)vertex).getInDegree() + ((VertexWithDegrees)vertex).getOutDegree())); + setNewVertexValue(count == (getInDegree() + getOutDegree())); } } @SuppressWarnings("serial") - public static final class UpdateFunctionDegrees extends VertexUpdateFunction, Long> { + public static final class UpdateFunctionDegrees extends VertexUpdateFunction { @Override - public void updateVertex(Vertex> vertex, MessageIterator inMessages) { - try { - setNewVertexValue(new Tuple3(vertex.getValue().f0, vertex.getValue().f1, (((VertexWithDegrees)vertex).getInDegree() == vertex.getValue().f0) - && (((VertexWithDegrees)vertex).getOutDegree() == vertex.getValue().f1) && vertex.getValue().f2)); - } catch (Exception e) { - e.printStackTrace(); - } + public void updateVertex(Vertex vertex, MessageIterator inMessages) { + long inDegree = getInDegree(); + long outDegree = getOutDegree(); + setNewVertexValue(inDegree + outDegree); } } @@ -593,16 +670,6 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase { } } - @SuppressWarnings("serial") - public static final class DegreeMessageFunction extends MessagingFunction, Long, Long> { - - @Override - public void sendMessages(Vertex> vertex) { - //send message to keep vertices active - sendMessageToAllNeighbors(vertex.getValue().f0); - } - } - @SuppressWarnings("serial") public static final class AssignOneMapper implements MapFunction, Long> { diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java deleted file mode 100644 index 5c57f47fd93eae0ee9ede8a4b45e7618f21d3ca2..0000000000000000000000000000000000000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.io.DiscardingOutputFormat; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.VertexWithDegrees; -import org.apache.flink.graph.spargel.MessageIterator; -import org.apache.flink.graph.spargel.MessagingFunction; -import org.apache.flink.graph.spargel.VertexUpdateFunction; -import org.apache.flink.test.util.ForkableFlinkMiniCluster; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import static org.junit.Assert.fail; - -public class VertexCentricConfigurationWithExceptionITCase { - - private static final int PARALLELISM = 4; - - private static ForkableFlinkMiniCluster cluster; - - - @BeforeClass - public static void setupCluster() { - try { - Configuration config = new Configuration(); - config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM); - cluster = new ForkableFlinkMiniCluster(config, false); - } - catch (Exception e) { - e.printStackTrace(); - fail("Error starting test cluster: " + e.getMessage()); - } - } - - @AfterClass - public static void tearDownCluster() { - try { - cluster.stop(); - } - catch (Throwable t) { - t.printStackTrace(); - fail("Cluster shutdown caused an exception: " + t.getMessage()); - } - } - - @Test - public void testOutDegreesNotSet() throws Exception { - - /* - * Test that if the degrees are not set, the out degrees cannot be accessed - an - * exception is thrown. - */ - final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getJobManagerRPCPort()); - env.setParallelism(PARALLELISM); - env.getConfig().disableSysoutLogging(); - - Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), - TestGraphUtils.getLongLongEdges(), env); - - try { - DataSet> verticesWithOutDegrees = graph.runVertexCentricIteration(new UpdateFunctionOutDegree(), - new DummyMessageFunction(), 5).getVertices(); - - verticesWithOutDegrees.output(new DiscardingOutputFormat>()); - env.execute(); - - fail("The degree option not set test did not fail"); - } catch (Exception e) { - // We expect the job to fail with an exception - } - } - - @Test - public void testInDegreesNotSet() throws Exception { - - /* - * Test that if the degrees are not set, the in degrees cannot be accessed - an - * exception is thrown. - */ - final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getJobManagerRPCPort()); - env.setParallelism(PARALLELISM); - env.getConfig().disableSysoutLogging(); - - Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), - TestGraphUtils.getLongLongEdges(), env); - - try { - DataSet> verticesWithInDegrees = graph.runVertexCentricIteration(new UpdateFunctionInDegree(), - new DummyMessageFunction(), 5).getVertices(); - - verticesWithInDegrees.output(new DiscardingOutputFormat>()); - env.execute(); - - fail("The degree option not set test did not fail"); - } catch (Exception e) { - // We expect the job to fail with an exception - } - } - - @Test - public void testNumVerticesNotSet() throws Exception { - - /* - * Test that if the number of vertices option is not set, this number cannot be accessed - - * an exception id thrown. - */ - final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment( - "localhost", cluster.getJobManagerRPCPort()); - env.setParallelism(PARALLELISM); - env.getConfig().disableSysoutLogging(); - - Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), - TestGraphUtils.getLongLongEdges(), env); - - try { - DataSet> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(), - new DummyMessageFunction(), 5).getVertices(); - - verticesWithNumVertices.output(new DiscardingOutputFormat>()); - env.execute(); - - fail("The num vertices option not set test did not fail"); - } catch (Exception e) { - // We expect the job to fail with an exception - } - } - - @SuppressWarnings("serial") - public static final class UpdateFunctionInDegree extends VertexUpdateFunction { - - @Override - public void updateVertex(Vertex vertex, MessageIterator inMessages) throws Exception { - setNewVertexValue(((VertexWithDegrees) vertex).getInDegree()); - } - } - - @SuppressWarnings("serial") - public static final class UpdateFunctionOutDegree extends VertexUpdateFunction { - - @Override - public void updateVertex(Vertex vertex, MessageIterator inMessages) throws Exception { - setNewVertexValue(((VertexWithDegrees)vertex).getOutDegree()); - } - } - - @SuppressWarnings("serial") - public static final class UpdateFunctionNumVertices extends VertexUpdateFunction { - - @Override - public void updateVertex(Vertex vertex, MessageIterator inMessages) throws Exception { - setNewVertexValue(getNumberOfVertices()); - } - } - - @SuppressWarnings("serial") - public static final class DummyMessageFunction extends MessagingFunction { - - @Override - public void sendMessages(Vertex vertex) { - //send message to keep vertices active - sendMessageToAllNeighbors(vertex.getValue()); - } - } -} diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java index 17a4cb03c00c8f43a2927ee2b7213a16fcb921f5..f2f3d8c68edc97a17dcba2e24d707f2165cc5906 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java @@ -66,6 +66,7 @@ public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTes result.writeAsCsv(resultPath, "\n", " "); env.execute(); } + /** * A map function that takes a Long value and creates a 2-tuple out of it: *
(Long value) -> (value, value)
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java index 47f8fe55f3e12378d99a284fe68aaa2480525af0..a0042a8ba138fb9b32e6207fd93319816fc0fbb2 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java @@ -28,7 +28,7 @@ import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.graph.example.IncrementalSSSPExample; import org.apache.flink.graph.example.utils.IncrementalSSSPData; -import org.apache.flink.graph.spargel.IterationConfiguration; +import org.apache.flink.graph.spargel.VertexCentricConfiguration; import org.apache.flink.test.util.MultipleProgramsTestBase; import org.junit.After; import org.junit.Before; @@ -101,7 +101,7 @@ public class IncrementalSSSPITCase extends MultipleProgramsTestBase { graph.removeEdge(edgeToBeRemoved); // configure the iteration - IterationConfiguration parameters = new IterationConfiguration(); + VertexCentricConfiguration parameters = new VertexCentricConfiguration(); if(IncrementalSSSPExample.isInSSSP(edgeToBeRemoved, edgesInSSSP)) {