提交 40749ddc 编写于 作者: G Greg Hogan

[FLINK-3277] [gelly] Use Value types in Gelly API

This closes #1671
上级 10898a90
......@@ -346,13 +346,13 @@ DataSet<K> getVertexIds()
DataSet<Tuple2<K, K>> getEdgeIds()
// get a DataSet of <vertex ID, in-degree> pairs for all vertices
DataSet<Tuple2<K, Long>> inDegrees()
DataSet<Tuple2<K, LongValue>> inDegrees()
// get a DataSet of <vertex ID, out-degree> pairs for all vertices
DataSet<Tuple2<K, Long>> outDegrees()
DataSet<Tuple2<K, LongValue>> outDegrees()
// get a DataSet of <vertex ID, degree> pairs for all vertices, where degree is the sum of in- and out- degrees
DataSet<Tuple2<K, Long>> getDegrees()
DataSet<Tuple2<K, LongValue>> getDegrees()
// get the number of vertices
long numberOfVertices()
......@@ -381,13 +381,13 @@ getVertexIds: DataSet[K]
getEdgeIds: DataSet[(K, K)]
// get a DataSet of <vertex ID, in-degree> pairs for all vertices
inDegrees: DataSet[(K, Long)]
inDegrees: DataSet[(K, LongValue)]
// get a DataSet of <vertex ID, out-degree> pairs for all vertices
outDegrees: DataSet[(K, Long)]
outDegrees: DataSet[(K, LongValue)]
// get a DataSet of <vertex ID, degree> pairs for all vertices, where degree is the sum of in- and out- degrees
getDegrees: DataSet[(K, Long)]
getDegrees: DataSet[(K, LongValue)]
// get the number of vertices
numberOfVertices: Long
......@@ -519,13 +519,13 @@ Note that if the input dataset contains a key multiple times, all Gelly join met
{% highlight java %}
Graph<Long, Double, Double> network = ...
DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
DataSet<Tuple2<Long, LongValue>> vertexOutDegrees = network.outDegrees();
// assign the transition probabilities as the edge weights
Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees,
new VertexJoinFunction<Double, Long>() {
public Double vertexJoin(Double vertexValue, Long inputValue) {
return vertexValue / inputValue;
new VertexJoinFunction<Double, LongValue>() {
public Double vertexJoin(Double vertexValue, LongValue inputValue) {
return vertexValue / inputValue.getValue();
}
});
{% endhighlight %}
......@@ -535,10 +535,10 @@ Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(v
{% highlight scala %}
val network: Graph[Long, Double, Double] = ...
val vertexOutDegrees: DataSet[(Long, Long)] = network.outDegrees
val vertexOutDegrees: DataSet[(Long, LongValue)] = network.outDegrees
// assign the transition probabilities as the edge weights
val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Double, v2: Long) => v1 / v2)
val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Double, v2: LongValue) => v1 / v2.getValue)
{% endhighlight %}
</div>
</div>
......
......@@ -18,7 +18,6 @@
package org.apache.flink.graph.examples;
import org.apache.flink.graph.examples.utils.ExampleUtils;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
......@@ -27,6 +26,8 @@ import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.examples.utils.ExampleUtils;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
/**
......@@ -66,7 +67,7 @@ public class GraphMetrics implements ProgramDescription {
long numEdges = graph.numberOfEdges();
/** compute the average node degree **/
DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();
DataSet<Tuple2<Long, LongValue>> verticesWithDegrees = graph.getDegrees();
DataSet<Double> avgNodeDegree = verticesWithDegrees
.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices));
......@@ -96,7 +97,7 @@ public class GraphMetrics implements ProgramDescription {
}
@SuppressWarnings("serial")
private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, Long>, Double> {
private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, LongValue>, Double> {
private long numberOfVertices;
......@@ -104,14 +105,14 @@ public class GraphMetrics implements ProgramDescription {
this.numberOfVertices = numberOfVertices;
}
public Double map(Tuple2<Long, Long> sumTuple) {
return (double) (sumTuple.f1 / numberOfVertices) ;
public Double map(Tuple2<Long, LongValue> sumTuple) {
return (double) (sumTuple.f1.getValue() / numberOfVertices) ;
}
}
@SuppressWarnings("serial")
private static final class ProjectVertexId implements MapFunction<Tuple2<Long,Long>, Long> {
public Long map(Tuple2<Long, Long> value) { return value.f0; }
private static final class ProjectVertexId implements MapFunction<Tuple2<Long, LongValue>, Long> {
public Long map(Tuple2<Long, LongValue> value) { return value.f0; }
}
@Override
......
......@@ -61,7 +61,7 @@ object GraphMetrics {
/** compute the average node degree **/
val verticesWithDegrees = graph.getDegrees
val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble)
val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2.getValue / numVertices).toDouble)
/** find the vertex with the maximum in-degree **/
val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
......
......@@ -18,24 +18,22 @@
package org.apache.flink.graph.scala
import org.apache.flink.util.Preconditions
import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.{tuple => jtuple}
import org.apache.flink.api.scala._
import org.apache.flink.graph._
import org.apache.flink.graph.asm.translate.TranslateFunction
import org.apache.flink.graph.validation.GraphValidator
import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
import org.apache.flink.graph.pregel.{ComputeFunction, MessageCombiner, VertexCentricConfiguration}
import org.apache.flink.graph.spargel.{MessagingFunction, ScatterGatherConfiguration, VertexUpdateFunction}
import org.apache.flink.graph.validation.GraphValidator
import org.apache.flink.types.{LongValue, NullValue}
import org.apache.flink.util.Preconditions
import org.apache.flink.{graph => jg}
import _root_.scala.collection.JavaConverters._
import _root_.scala.reflect.ClassTag
import org.apache.flink.types.NullValue
import org.apache.flink.graph.pregel.ComputeFunction
import org.apache.flink.graph.pregel.MessageCombiner
import org.apache.flink.graph.pregel.VertexCentricConfiguration
object Graph {
......@@ -803,8 +801,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
*
* @return A DataSet of Tuple2<vertexId, inDegree>
*/
def inDegrees(): DataSet[(K, Long)] = {
wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue()))
def inDegrees(): DataSet[(K, LongValue)] = {
wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
}
/**
......@@ -812,8 +810,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
*
* @return A DataSet of Tuple2<vertexId, outDegree>
*/
def outDegrees(): DataSet[(K, Long)] = {
wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue()))
def outDegrees(): DataSet[(K, LongValue)] = {
wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
}
/**
......@@ -821,8 +819,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
*
* @return A DataSet of Tuple2<vertexId, degree>
*/
def getDegrees(): DataSet[(K, Long)] = {
wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue()))
def getDegrees(): DataSet[(K, LongValue)] = {
wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
}
/**
......
......@@ -61,6 +61,7 @@ import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
import org.apache.flink.graph.utils.VertexToTuple2Map;
import org.apache.flink.graph.validation.GraphValidator;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
......@@ -867,25 +868,31 @@ public class Graph<K, VV, EV> {
*
* @return A DataSet of {@code Tuple2<vertexId, outDegree>}
*/
public DataSet<Tuple2<K, Long>> outDegrees() {
public DataSet<Tuple2<K, LongValue>> outDegrees() {
return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>());
}
private static final class CountNeighborsCoGroup<K, VV, EV>
implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Long>> {
implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, LongValue>> {
private LongValue degree = new LongValue();
private Tuple2<K, LongValue> vertexDegree = new Tuple2<>(null, degree);
@SuppressWarnings("unused")
public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Edge<K, EV>> outEdges,
Collector<Tuple2<K, Long>> out) {
Collector<Tuple2<K, LongValue>> out) {
long count = 0;
for (Edge<K, EV> edge : outEdges) {
count++;
}
degree.setValue(count);
Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
if(vertexIterator.hasNext()) {
out.collect(new Tuple2<K, Long>(vertexIterator.next().f0, count));
vertexDegree.f0 = vertexIterator.next().f0;
out.collect(vertexDegree);
} else {
throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
}
......@@ -897,7 +904,7 @@ public class Graph<K, VV, EV> {
*
* @return A DataSet of {@code Tuple2<vertexId, inDegree>}
*/
public DataSet<Tuple2<K, Long>> inDegrees() {
public DataSet<Tuple2<K, LongValue>> inDegrees() {
return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup<K, VV, EV>());
}
......@@ -907,7 +914,7 @@ public class Graph<K, VV, EV> {
*
* @return A DataSet of {@code Tuple2<vertexId, degree>}
*/
public DataSet<Tuple2<K, Long>> getDegrees() {
public DataSet<Tuple2<K, LongValue>> getDegrees() {
return outDegrees().union(inDegrees()).groupBy(0).sum(1);
}
......
......@@ -29,6 +29,7 @@ import org.apache.flink.graph.gsa.GSAConfiguration;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.types.LongValue;
/**
* This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
......@@ -56,8 +57,7 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
@Override
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
Graph<K, Double, Double> networkWithWeights = network
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
......@@ -114,10 +114,10 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
}
@SuppressWarnings("serial")
private static final class InitWeights implements EdgeJoinFunction<Double, Long> {
private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> {
public Double edgeJoin(Double edgeValue, Long inputValue) {
return edgeValue / (double) inputValue;
public Double edgeJoin(Double edgeValue, LongValue inputValue) {
return edgeValue / (double) inputValue.getValue();
}
}
}
......@@ -29,6 +29,7 @@ import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.types.LongValue;
/**
* This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration.
......@@ -56,8 +57,7 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
@Override
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
Graph<K, Double, Double> networkWithWeights = network
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
......@@ -118,10 +118,10 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
}
@SuppressWarnings("serial")
private static final class InitWeights implements EdgeJoinFunction<Double, Long> {
private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> {
public Double edgeJoin(Double edgeValue, Long inputValue) {
return edgeValue / (double) inputValue;
public Double edgeJoin(Double edgeValue, LongValue inputValue) {
return edgeValue / (double) inputValue.getValue();
}
}
......
......@@ -309,28 +309,28 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
@SuppressWarnings("serial")
private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> {
private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> {
private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, Long, Long>, Message> vertexUpdateFunction,
TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> resultType) {
private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, LongValue, LongValue>, Message> vertexUpdateFunction,
TypeInformation<Vertex<K, Tuple3<VV, LongValue, LongValue>>> resultType) {
super(vertexUpdateFunction, resultType);
}
@Override
public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> vertex,
Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertex,
Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> vertexIter = vertex.iterator();
final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertexIter = vertex.iterator();
if (vertexIter.hasNext()) {
Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = vertexIter.next();
Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = vertexIter.next();
@SuppressWarnings("unchecked")
Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
messageIter.setSource(downcastIter);
vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1);
vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2);
vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
vertexUpdateFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, messageIter);
......@@ -420,7 +420,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
@SuppressWarnings("serial")
private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>
extends MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> {
extends MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> {
private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
......@@ -430,19 +430,19 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
@Override
public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> state,
public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> state,
Collector<Tuple2<K, Message>> out) throws Exception {
final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> stateIter = state.iterator();
final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> stateIter = state.iterator();
if (stateIter.hasNext()) {
Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = stateIter.next();
Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = stateIter.next();
nextVertex.setField(vertexWithDegrees.f0, 0);
nextVertex.setField(vertexWithDegrees.f1.f0, 1);
messagingFunction.setInDegree(vertexWithDegrees.f1.f1);
messagingFunction.setOutDegree(vertexWithDegrees.f1.f2);
messagingFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
messagingFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
messagingFunction.set((Iterator<?>) edges.iterator(), out, vertexWithDegrees.getId());
messagingFunction.sendMessages(nextVertex);
......@@ -505,13 +505,13 @@ public class ScatterGatherIteration<K, VV, Message, EV>
* @return the messaging function
*/
private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration,
DeltaIteration<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, Tuple3<VV, LongValue, LongValue>>> iteration,
TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg,
DataSet<LongValue> numberOfVertices) {
// build the messaging function (co group)
CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> messenger =
MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> messenger =
new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
......@@ -626,34 +626,34 @@ public class ScatterGatherIteration<K, VV, Message, EV>
this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
DataSet<Tuple2<K, Long>> inDegrees = graph.inDegrees();
DataSet<Tuple2<K, Long>> outDegrees = graph.outDegrees();
DataSet<Tuple2<K, LongValue>> inDegrees = graph.inDegrees();
DataSet<Tuple2<K, LongValue>> outDegrees = graph.outDegrees();
DataSet<Tuple3<K, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
.with(new FlatJoinFunction<Tuple2<K, Long>, Tuple2<K, Long>, Tuple3<K, Long, Long>>() {
DataSet<Tuple3<K, LongValue, LongValue>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
.with(new FlatJoinFunction<Tuple2<K, LongValue>, Tuple2<K, LongValue>, Tuple3<K, LongValue, LongValue>>() {
@Override
public void join(Tuple2<K, Long> first, Tuple2<K, Long> second, Collector<Tuple3<K, Long, Long>> out) {
out.collect(new Tuple3<K, Long, Long>(first.f0, first.f1, second.f1));
public void join(Tuple2<K, LongValue> first, Tuple2<K, LongValue> second, Collector<Tuple3<K, LongValue, LongValue>> out) {
out.collect(new Tuple3<K, LongValue, LongValue>(first.f0, first.f1, second.f1));
}
}).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
DataSet<Vertex<K, Tuple3<VV, Long, Long>>> verticesWithDegrees = initialVertices
DataSet<Vertex<K, Tuple3<VV, LongValue, LongValue>>> verticesWithDegrees = initialVertices
.join(degrees).where(0).equalTo(0)
.with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K,Long,Long>, Vertex<K, Tuple3<VV, Long, Long>>>() {
.with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K, LongValue, LongValue>, Vertex<K, Tuple3<VV, LongValue, LongValue>>>() {
@Override
public void join(Vertex<K, VV> vertex, Tuple3<K, Long, Long> degrees,
Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
public void join(Vertex<K, VV> vertex, Tuple3<K, LongValue, LongValue> degrees,
Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
out.collect(new Vertex<K, Tuple3<VV, Long, Long>>(vertex.getId(),
new Tuple3<VV, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
out.collect(new Vertex<K, Tuple3<VV, LongValue, LongValue>>(vertex.getId(),
new Tuple3<VV, LongValue, LongValue>(vertex.getValue(), degrees.f1, degrees.f2)));
}
}).withForwardedFieldsFirst("f0");
// add type info
TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
TypeInformation<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertexTypes = verticesWithDegrees.getType();
final DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration =
final DeltaIteration<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, Tuple3<VV, LongValue, LongValue>>> iteration =
verticesWithDegrees.iterateDelta(verticesWithDegrees, this.maximumNumberOfIterations, 0);
setUpIteration(iteration);
......@@ -673,11 +673,11 @@ public class ScatterGatherIteration<K, VV, Message, EV>
}
@SuppressWarnings({ "unchecked", "rawtypes" })
VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> updateUdf =
VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> updateUdf =
new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
// build the update function (co group)
CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, Long, Long>>> updates =
CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, LongValue, LongValue>>> updates =
messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
if (this.configuration != null && this.configuration.isOptNumVertices()) {
......@@ -687,9 +687,9 @@ public class ScatterGatherIteration<K, VV, Message, EV>
configureUpdateFunction(updates);
return iteration.closeWith(updates, updates).map(
new MapFunction<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, VV>>() {
new MapFunction<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, VV>>() {
public Vertex<K, VV> map(Vertex<K, Tuple3<VV, Long, Long>> vertex) {
public Vertex<K, VV> map(Vertex<K, Tuple3<VV, LongValue, LongValue>> vertex) {
return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0);
}
});
......
......@@ -56,10 +56,10 @@ extends AbstractGraphTest {
assertEquals(vertexCount, graph.numberOfVertices());
assertEquals(vertexCount*(vertexCount-1), graph.numberOfEdges());
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(vertexCount - 1, minInDegree);
assertEquals(vertexCount - 1, minOutDegree);
......
......@@ -55,10 +55,10 @@ extends AbstractGraphTest {
assertEquals(vertexCount, graph.numberOfVertices());
assertEquals(2 * vertexCount, graph.numberOfEdges());
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(2, minInDegree);
assertEquals(2, minOutDegree);
......
......@@ -54,8 +54,8 @@ extends AbstractGraphTest {
assertEquals(vertexCount, graph.numberOfVertices());
assertEquals(0, graph.numberOfEdges());
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(0, maxInDegree);
assertEquals(0, maxOutDegree);
......
......@@ -63,10 +63,10 @@ extends AbstractGraphTest {
assertEquals(2*3*5*7, graph.numberOfVertices());
assertEquals(7 * 2*3*5*7, graph.numberOfEdges());
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(7, minInDegree);
assertEquals(7, minOutDegree);
......
......@@ -57,10 +57,10 @@ extends AbstractGraphTest {
assertEquals(1L << dimensions, graph.numberOfVertices());
assertEquals(dimensions * (1L << dimensions), graph.numberOfEdges());
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(dimensions, minInDegree);
assertEquals(dimensions, minOutDegree);
......
......@@ -55,10 +55,10 @@ extends AbstractGraphTest {
assertEquals(vertexCount, graph.numberOfVertices());
assertEquals(2 * (vertexCount - 1), graph.numberOfEdges());
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(1, minInDegree);
assertEquals(1, minOutDegree);
......
......@@ -56,10 +56,10 @@ extends AbstractGraphTest {
assertEquals(2 * vertexPairCount, graph.numberOfVertices());
assertEquals(2 * vertexPairCount, graph.numberOfEdges());
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(1, minInDegree);
assertEquals(1, minOutDegree);
......
......@@ -57,10 +57,10 @@ extends AbstractGraphTest {
assertEquals(vertexCount, graph.numberOfVertices());
assertEquals(2 * (vertexCount - 1), graph.numberOfEdges());
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
assertEquals(1, minInDegree);
assertEquals(1, minOutDegree);
......
......@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.junit.Test;
import org.junit.runner.RunWith;
......@@ -50,8 +51,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
List<Tuple2<Long,Long>> result= data.collect();
DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,2\n" +
......@@ -76,8 +77,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
List<Tuple2<Long,Long>> result= data.collect();
DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,3\n" +
"2,1\n" +
......@@ -99,8 +100,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long,Long>> data =graph.inDegrees();
List<Tuple2<Long,Long>> result= data.collect();
DataSet<Tuple2<Long, LongValue>> data = graph.inDegrees();
List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,1\n" +
"2,1\n" +
......@@ -120,8 +121,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
DataSet<Tuple2<Long,Long>> data =graph.inDegrees();
List<Tuple2<Long,Long>> result= data.collect();
DataSet<Tuple2<Long, LongValue>> data = graph.inDegrees();
List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,0\n" +
"2,1\n" +
......@@ -142,8 +143,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Tuple2<Long,Long>> data =graph.getDegrees();
List<Tuple2<Long,Long>> result= data.collect();
DataSet<Tuple2<Long, LongValue>> data = graph.getDegrees();
List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,3\n" +
"2,2\n" +
......@@ -164,8 +165,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
Graph<Long, NullValue, Long> graph =
Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
List<Tuple2<Long,Long>> result= data.collect();
DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
List<Tuple2<Long, LongValue>> result = data.collect();
expectedResult = "1,2\n" +
"2,1\n" +
......
......@@ -28,6 +28,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.test.TestGraphUtils;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.types.LongValue;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
......@@ -81,7 +82,7 @@ public class DegreesWithExceptionITCase {
TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
try {
graph.outDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
graph.outDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
env.execute();
fail("graph.outDegrees() did not fail.");
......@@ -105,7 +106,7 @@ public class DegreesWithExceptionITCase {
TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
try {
graph.inDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
graph.inDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
env.execute();
fail("graph.inDegrees() did not fail.");
......@@ -129,7 +130,7 @@ public class DegreesWithExceptionITCase {
TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
try {
graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
env.execute();
fail("graph.getDegrees() did not fail.");
......@@ -153,7 +154,7 @@ public class DegreesWithExceptionITCase {
TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
try {
graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
env.execute();
fail("graph.getDegrees() did not fail.");
......@@ -177,7 +178,7 @@ public class DegreesWithExceptionITCase {
TestGraphUtils.getLongLongEdgeInvalidSrcTrgData(env), env);
try {
graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
env.execute();
fail("graph.getDegrees() did not fail.");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册