提交 240e8895 编写于 作者: P Pieter-Jan Van Aeken 提交者: Stephan Ewen

[FLINK-1962] Add Gelly Scala API

This closes #1004
上级 d2e88ffd
......@@ -64,6 +64,10 @@ import scala.reflect.ClassTag
*/
class ExecutionEnvironment(javaEnv: JavaEnv) {
/**
* @return the Java Execution environment.
*/
def getJavaEnv: JavaEnv = javaEnv
/**
* Gets the config object.
*/
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>flink-staging</artifactId>
<groupId>org.apache.flink</groupId>
<version>0.10-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>flink-gelly-scala</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-gelly</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.1.4</version>
<executions>
<!-- Run scala compiler in the process-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) compile phase -->
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<jvmArgs>
<jvmArg>-Xms128m</jvmArg>
<jvmArg>-Xmx512m</jvmArg>
</jvmArgs>
<compilerPlugins combine.children="append">
<compilerPlugin>
<groupId>org.scalamacros</groupId>
<artifactId>paradise_${scala.version}</artifactId>
<version>${scala.macros.version}</version>
</compilerPlugin>
</compilerPlugins>
</configuration>
</plugin>
<!-- Eclipse Integration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<version>2.8</version>
<configuration>
<downloadSources>true</downloadSources>
<projectnatures>
<projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
<projectnature>org.eclipse.jdt.core.javanature</projectnature>
</projectnatures>
<buildcommands>
<buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<classpathContainers>
<classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
</classpathContainers>
<excludes>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
</excludes>
<sourceIncludes>
<sourceInclude>**/*.scala</sourceInclude>
<sourceInclude>**/*.java</sourceInclude>
</sourceIncludes>
</configuration>
</plugin>
<!-- Adding scala source directories to build path -->
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<!-- Add src/main/scala to eclipse build path -->
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>src/main/scala</source>
</sources>
</configuration>
</execution>
<!-- Add src/test/scala to eclipse build path -->
<execution>
<id>add-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/test/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>0.5.0</version>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.graph.Edge
import org.apache.flink.util.Collector
abstract class EdgesFunction[K, EV, T] extends org.apache.flink.graph.EdgesFunction[K, EV, T] {
def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T])
override def iterateEdges(edges: java.lang.Iterable[Tuple2[K, Edge[K, EV]]], out:
Collector[T]): Unit = {
val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(edges)
.map(jtuple => (jtuple.f0, jtuple.f1))
iterateEdges(scalaIterable, out)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.util.Collector
abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] extends org.apache.flink.graph
.EdgesFunctionWithVertexValue[K, VV, EV, T] {
@throws(classOf[Exception])
def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T])
override def iterateEdges(v: Vertex[K, VV], edges: java.lang.Iterable[Edge[K, EV]], out:
Collector[T]) = {
iterateEdges(v, scala.collection.JavaConversions.iterableAsScalaIterable(edges), out)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala
import org.apache.flink.api.java.tuple.Tuple3
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.util.Collector
abstract class NeighborsFunction[K, VV, EV, T] extends org.apache.flink.graph
.NeighborsFunction[K, VV, EV, T] {
def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])], out: Collector[T])
override def iterateNeighbors(neighbors: java.lang.Iterable[Tuple3[K, Edge[K, EV], Vertex[K,
VV]]], out: Collector[T]) = {
val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(neighbors)
.map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2))
iterateNeighbors(scalaIterable, out)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala
import java.lang
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.util.Collector
abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] extends org.apache.flink.graph
.NeighborsFunctionWithVertexValue[K, VV, EV, T] {
def iterateNeighbors(vertex: Vertex[K, VV], neighbors: Iterable[(Edge[K, EV], Vertex[K, VV])
], out: Collector[T]): Unit
override def iterateNeighbors(vertex: Vertex[K, VV], neighbors: lang.Iterable[Tuple2[Edge[K,
EV], Vertex[K, VV]]], out: Collector[T]): Unit = {
val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(neighbors)
.map(jtuple => (jtuple.f0, jtuple.f1))
iterateNeighbors(vertex, scalaIterable, out)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.graph.{Graph => JGraph}
import _root_.scala.reflect.ClassTag
package object scala {
private[flink] def wrapGraph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag,
EV: TypeInformation : ClassTag](javagraph: JGraph[K, VV, EV]) = new Graph[K, VV, EV](javagraph)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala.utils
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.graph.Edge
class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] {
private val serialVersionUID: Long = 1L
override def map(value: Edge[K, EV]): (K, K, EV) = {
(value.getSource, value.getTarget, value.getValue)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala.utils
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.graph.Vertex
class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] {
private val serialVersionUID: Long = 1L
override def map(value: Vertex[K, VV]): (K, VV) = {
(value.getId, value.getValue)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala.test
import org.apache.flink.api.scala._
import org.apache.flink.graph.{Edge, Vertex}
object TestGraphUtils {
def getLongLongVertexData(env: ExecutionEnvironment): DataSet[Vertex[Long, Long]] = {
return env.fromCollection(getLongLongVertices)
}
def getLongLongEdgeData(env: ExecutionEnvironment): DataSet[Edge[Long, Long]] = {
return env.fromCollection(getLongLongEdges)
}
def getLongLongVertices: List[Vertex[Long, Long]] = {
List(
new Vertex[Long, Long](1L, 1L),
new Vertex[Long, Long](2L, 2L),
new Vertex[Long, Long](3L, 3L),
new Vertex[Long, Long](4L, 4L),
new Vertex[Long, Long](5L, 5L)
)
}
def getLongLongEdges: List[Edge[Long, Long]] = {
List(
new Edge[Long, Long](1L, 2L, 12L),
new Edge[Long, Long](1L, 3L, 13L),
new Edge[Long, Long](2L, 3L, 23L),
new Edge[Long, Long](3L, 4L, 34L),
new Edge[Long, Long](3L, 5L, 35L),
new Edge[Long, Long](4L, 5L, 45L),
new Edge[Long, Long](5L, 1L, 51L)
)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala.test.operations
import org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
@RunWith(classOf[Parameterized])
class DegreesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
private var resultPath: String = null
private var expectedResult: String = null
var tempFolder: TemporaryFolder = new TemporaryFolder()
@Rule
def getFolder(): TemporaryFolder = {
tempFolder;
}
@Before
@throws(classOf[Exception])
def before {
resultPath = tempFolder.newFile.toURI.toString
}
@After
@throws(classOf[Exception])
def after {
TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
}
@Test
@throws(classOf[Exception])
def testInDegrees {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.inDegrees().writeAsCsv(resultPath)
env.execute
expectedResult = "1,1\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,2\n"
}
@Test
@throws(classOf[Exception])
def testOutDegrees {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.outDegrees().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,1\n"
}
@Test
@throws(classOf[Exception])
def testGetDegrees {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.getDegrees().writeAsCsv(resultPath)
env.execute
expectedResult = "1,3\n" + "2,2\n" + "3,4\n" + "4,2\n" + "5,3\n"
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala.test.operations
import org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
@RunWith(classOf[Parameterized])
class GraphMutationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
private var resultPath: String = null
private var expectedResult: String = null
var tempFolder: TemporaryFolder = new TemporaryFolder()
@Rule
def getFolder(): TemporaryFolder = {
tempFolder;
}
@Before
@throws(classOf[Exception])
def before {
resultPath = tempFolder.newFile.toURI.toString
}
@After
@throws(classOf[Exception])
def after {
TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
}
@Test
@throws(classOf[Exception])
def testAddVertex {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
newgraph.getVerticesAsTuple2.writeAsCsv(resultPath)
env.execute
expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
}
@Test
@throws(classOf[Exception])
def testAddVertexExisting {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.addVertex(new Vertex[Long, Long](1L, 1L))
newgraph.getVerticesAsTuple2.writeAsCsv(resultPath)
env.execute
expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"
}
@Test
@throws(classOf[Exception])
def testAddVertexNoEdges {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
newgraph.getVerticesAsTuple2.writeAsCsv(resultPath)
env.execute
expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
}
@Test
@throws(classOf[Exception])
def testRemoveVertex {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.removeVertex(new Vertex[Long, Long](5L, 5L))
newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n"
}
@Test
@throws(classOf[Exception])
def testRemoveInvalidVertex {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L))
newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
"45\n" + "5,1,51\n"
}
@Test
@throws(classOf[Exception])
def testAddEdge {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L,
1L), 61L)
newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
"45\n" + "5,1,51\n" + "6,1,61\n"
}
@Test
@throws(classOf[Exception])
def testAddExistingEdge {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L,
2L), 12L)
newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
"35\n" + "4,5,45\n" + "5,1,51\n"
}
@Test
@throws(classOf[Exception])
def testRemoveEdge {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L))
newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
}
@Test
@throws(classOf[Exception])
def testRemoveInvalidEdge {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L))
newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
"45\n" + "5,1,51\n"
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.flink.api.common.functions.FilterFunction
import org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
@RunWith(classOf[Parameterized])
class GraphOperationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
private var resultPath: String = null
private var expectedResult: String = null
var tempFolder: TemporaryFolder = new TemporaryFolder()
@Rule
def getFolder(): TemporaryFolder = {
tempFolder;
}
@Before
@throws(classOf[Exception])
def before {
resultPath = tempFolder.newFile.toURI.toString
}
@After
@throws(classOf[Exception])
def after {
TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
}
@Test
@throws(classOf[Exception])
def testUndirected {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.getUndirected().getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," +
"23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" +
"5,1,51\n" + "1,5,51\n"
}
@Test
@throws(classOf[Exception])
def testReverse {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.reverse().getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," +
"45\n" + "1,5,51\n"
}
@Test
@throws(classOf[Exception])
def testSubGraph {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.subgraph(new FilterFunction[Vertex[Long, Long]] {
@throws(classOf[Exception])
def filter(vertex: Vertex[Long, Long]): Boolean = {
return (vertex.getValue > 2)
}
}, new FilterFunction[Edge[Long, Long]] {
@throws(classOf[Exception])
override def filter(edge: Edge[Long, Long]): Boolean = {
return (edge.getValue > 34)
}
}).getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "3,5,35\n" + "4,5,45\n"
}
@Test
@throws(classOf[Exception])
def testSubGraphSugar {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.subgraph(
vertex => vertex.getValue > 2,
edge => edge.getValue > 34
).getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "3,5,35\n" + "4,5,45\n"
}
@Test
@throws(classOf[Exception])
def testFilterOnVertices {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] {
@throws(classOf[Exception])
def filter(vertex: Vertex[Long, Long]): Boolean = {
vertex.getValue > 2
}
}).getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
}
@Test
@throws(classOf[Exception])
def testFilterOnVerticesSugar {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.filterOnVertices(
vertex => vertex.getValue > 2
).getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
}
@Test
@throws(classOf[Exception])
def testFilterOnEdges {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] {
@throws(classOf[Exception])
def filter(edge: Edge[Long, Long]): Boolean = {
edge.getValue > 34
}
}).getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
}
@Test
@throws(classOf[Exception])
def testFilterOnEdgesSugar {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.filterOnEdges(
edge => edge.getValue > 34
).getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
}
@Test
@throws(classOf[Exception])
def testNumberOfVertices {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
env.fromElements(graph.numberOfVertices).writeAsText(resultPath)
env.execute
expectedResult = "5"
}
@Test
@throws(classOf[Exception])
def testNumberOfEdges {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
env.fromElements(graph.numberOfEdges).writeAsText(resultPath)
env.execute
expectedResult = "7"
}
@Test
@throws(classOf[Exception])
def testVertexIds {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.getVertexIds.writeAsText(resultPath)
env.execute
expectedResult = "1\n2\n3\n4\n5\n"
}
@Test
@throws(classOf[Exception])
def testEdgesIds {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.getEdgeIds.writeAsCsv(resultPath)
env.execute
expectedResult = "1,2\n" + "1,3\n" + "2,3\n" + "3,4\n" + "3,5\n" + "4,5\n" + "5,1\n"
}
@Test
@throws(classOf[Exception])
def testUnion {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
new Vertex[Long, Long](6L, 6L)
)
val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
new Edge[Long, Long](6L, 1L, 61L)
)
val newgraph = graph.union(Graph.fromCollection(vertices, edges, env))
newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
"45\n" + "5,1,51\n" + "6,1,61\n"
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala.test.operations
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._
import org.apache.flink.graph.Edge
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.graph.scala.utils.EdgeToTuple3Map
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
@RunWith(classOf[Parameterized])
class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
private var resultPath: String = null
private var expectedResult: String = null
var tempFolder: TemporaryFolder = new TemporaryFolder()
@Rule
def getFolder(): TemporaryFolder = {
tempFolder;
}
@Before
@throws(classOf[Exception])
def before {
resultPath = tempFolder.newFile.toURI.toString
}
@After
@throws(classOf[Exception])
def after {
TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
}
@Test
@throws(classOf[Exception])
def testWithEdgesInputDataset {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
EdgeToTuple3Map[Long, Long]), new AddValuesMapper)
result.getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
"90\n" + "5,1,102\n"
}
@Test
@throws(classOf[Exception])
def testWithEdgesInputDatasetSugar {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
result.getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
"90\n" + "5,1,102\n"
}
@Test
@throws(classOf[Exception])
def testWithEdgesOnSource {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
.map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
result.getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
"90\n" + "5,1,102\n"
}
@Test
@throws(classOf[Exception])
def testWithEdgesOnSourceSugar {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
.map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
result.getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
"90\n" + "5,1,102\n"
}
@Test
@throws(classOf[Exception])
def testWithEdgesOnTarget {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
.map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
result.getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
"80\n" + "5,1,102\n"
}
@Test
@throws(classOf[Exception])
def testWithEdgesOnTargetSugar {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
.map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
originalValue + tupleValue)
result.getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
"80\n" + "5,1,102\n"
}
final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
@throws(classOf[Exception])
def map(tuple: (Long, Long)): Long = {
tuple._1 + tuple._2
}
}
final class ProjectSourceAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)] {
@throws(classOf[Exception])
def map(edge: Edge[Long, Long]): (Long, Long) = {
(edge.getSource, edge.getValue)
}
}
final class ProjectTargetAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)] {
@throws(classOf[Exception])
def map(edge: Edge[Long, Long]): (Long, Long) = {
(edge.getTarget, edge.getValue)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala.test.operations
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.graph.scala.utils.VertexToTuple2Map
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
@RunWith(classOf[Parameterized])
class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
private var resultPath: String = null
private var expectedResult: String = null
var tempFolder: TemporaryFolder = new TemporaryFolder()
@Rule
def getFolder(): TemporaryFolder = {
tempFolder;
}
@Before
@throws(classOf[Exception])
def before {
resultPath = tempFolder.newFile.toURI.toString
}
@After
@throws(classOf[Exception])
def after {
TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
}
@Test
@throws(classOf[Exception])
def testJoinWithVertexSet {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new
VertexToTuple2Map[Long, Long]), new AddValuesMapper)
result.getVerticesAsTuple2().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
}
@Test
@throws(classOf[Exception])
def testJoinWithVertexSetSugar {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long])
val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet,
(originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue)
result.getVerticesAsTuple2().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
}
final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
@throws(classOf[Exception])
def map(tuple: (Long, Long)): Long = {
tuple._1 + tuple._2
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala.test.operations
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._
import org.apache.flink.graph.Edge
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
@RunWith(classOf[Parameterized])
class MapEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
private var resultPath: String = null
private var expectedResult: String = null
var tempFolder: TemporaryFolder = new TemporaryFolder()
@Rule
def getFolder(): TemporaryFolder = {
tempFolder;
}
@Before
@throws(classOf[Exception])
def before {
resultPath = tempFolder.newFile.toURI.toString
}
@After
@throws(classOf[Exception])
def after {
TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
}
@Test
@throws(classOf[Exception])
def testWithSameValue {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.mapEdges(new AddOneMapper)
.getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,13\n" +
"1,3,14\n" + "" +
"2,3,24\n" +
"3,4,35\n" +
"3,5,36\n" +
"4,5,46\n" +
"5,1,52\n"
}
@Test
@throws(classOf[Exception])
def testWithSameValueSugar {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.mapEdges(edge => edge.getValue + 1)
.getEdgesAsTuple3().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2,13\n" +
"1,3,14\n" + "" +
"2,3,24\n" +
"3,4,35\n" +
"3,5,36\n" +
"4,5,46\n" +
"5,1,52\n"
}
final class AddOneMapper extends MapFunction[Edge[Long, Long], Long] {
@throws(classOf[Exception])
def map(edge: Edge[Long, Long]): Long = {
edge.getValue + 1
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala.test.operations
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._
import org.apache.flink.graph.Vertex
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
@RunWith(classOf[Parameterized])
class MapVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
MultipleProgramsTestBase(mode) {
private var resultPath: String = null
private var expectedResult: String = null
var tempFolder: TemporaryFolder = new TemporaryFolder()
@Rule
def getFolder(): TemporaryFolder = {
tempFolder;
}
@Before
@throws(classOf[Exception])
def before {
resultPath = tempFolder.newFile.toURI.toString
}
@After
@throws(classOf[Exception])
def after {
TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
}
@Test
@throws(classOf[Exception])
def testWithSameValue {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.mapVertices(new AddOneMapper)
.getVerticesAsTuple2().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2\n" +
"2,3\n" +
"3,4\n" +
"4,5\n" +
"5,6\n";
}
@Test
@throws(classOf[Exception])
def testWithSameValueSugar {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.mapVertices(vertex => vertex.getValue + 1)
.getVerticesAsTuple2().writeAsCsv(resultPath)
env.execute
expectedResult = "1,2\n" +
"2,3\n" +
"3,4\n" +
"4,5\n" +
"5,6\n";
}
final class AddOneMapper extends MapFunction[Vertex[Long, Long], Long] {
@throws(classOf[Exception])
def map(vertex: Vertex[Long, Long]): Long = {
vertex.getValue + 1
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala.test.operations
import org.apache.flink.api.scala._
import org.apache.flink.graph._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.graph.scala.{EdgesFunction, EdgesFunctionWithVertexValue, Graph}
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.apache.flink.util.Collector
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
@RunWith(classOf[Parameterized])
class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
extends MultipleProgramsTestBase(mode) {
private var resultPath: String = null
private var expectedResult: String = null
var tempFolder: TemporaryFolder = new TemporaryFolder()
@Rule
def getFolder(): TemporaryFolder = {
tempFolder;
}
@Before
@throws(classOf[Exception])
def before {
resultPath = tempFolder.newFile.toURI.toString
}
@After
@throws(classOf[Exception])
def after {
TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
}
@Test
@throws(classOf[Exception])
def testAllNeighborsWithValueGreaterThanFour {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour,
EdgeDirection.ALL)
result.writeAsCsv(resultPath)
env.execute
expectedResult = "5,1\n" + "5,3\n" + "5,4"
}
@Test
@throws(classOf[Exception])
def testAllNeighbors {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL)
result.writeAsCsv(resultPath)
env.execute
expectedResult = "1,2\n" + "1,3\n" + "1,5\n" + "2,1\n" + "2,3\n" + "3,1\n" + "3,2\n" +
"3,4\n" + "3,5\n" + "4,3\n" + "4,5\n" + "5,1\n" + "5,3\n" + "5,4"
}
@Test
@throws(classOf[Exception])
def testLowestWeightOutNeighborNoValue {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
SelectMinWeightNeighborNoValue, EdgeDirection.OUT)
verticesWithLowestOutNeighbor.writeAsCsv(resultPath)
env.execute
expectedResult = "1,12\n" + "2,23\n" + "3,34\n" + "4,45\n" + "5,51\n"
}
@Test
@throws(classOf[Exception])
def testLowestWeightInNeighborNoValue {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
SelectMinWeightNeighborNoValue, EdgeDirection.IN)
verticesWithLowestOutNeighbor.writeAsCsv(resultPath)
env.execute
expectedResult = "1,51\n" + "2,12\n" + "3,13\n" + "4,34\n" + "5,35\n"
}
@Test
@throws(classOf[Exception])
def testMaxWeightAllNeighbors {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new
SelectMaxWeightNeighborNoValue, EdgeDirection.ALL)
verticesWithMaxEdgeWeight.writeAsCsv(resultPath)
env.execute
expectedResult = "1,51\n" + "2,23\n" + "3,35\n" + "4,45\n" + "5,51\n"
}
final class SelectNeighborsValueGreaterThanFour extends EdgesFunctionWithVertexValue[Long,
Long, Long, (Long, Long)] {
@throws(classOf[Exception])
override def iterateEdges(v: Vertex[Long, Long], edges: Iterable[Edge[Long, Long]], out:
Collector[(Long, Long)]): Unit = {
for (edge <- edges) {
if (v.getValue > 4) {
if (v.getId == edge.getTarget) {
out.collect((v.getId, edge.getSource))
}
else {
out.collect((v.getId, edge.getTarget))
}
}
}
}
}
final class SelectNeighbors extends EdgesFunction[Long, Long, (Long, Long)] {
@throws(classOf[Exception])
override def iterateEdges(edges: Iterable[(Long, Edge[Long, Long])], out: Collector[
(Long, Long)]) {
for (edge <- edges) {
if (edge._1.equals(edge._2.getTarget)) {
out.collect(new Tuple2[Long, Long](edge._1, edge._2.getSource))
}
else {
out.collect(new Tuple2[Long, Long](edge._1, edge._2.getTarget))
}
}
}
}
final class SelectMinWeightNeighborNoValue extends ReduceEdgesFunction[Long] {
override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = {
Math.min(firstEdgeValue, secondEdgeValue)
}
}
final class SelectMaxWeightNeighborNoValue extends ReduceEdgesFunction[Long] {
override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = {
Math.max(firstEdgeValue, secondEdgeValue)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.graph.scala.test.operations
import org.apache.flink.api.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.graph.scala.{NeighborsFunctionWithVertexValue, _}
import org.apache.flink.graph.{Edge, EdgeDirection, ReduceNeighborsFunction, Vertex}
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.apache.flink.util.Collector
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
@RunWith(classOf[Parameterized])
class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
extends MultipleProgramsTestBase(mode) {
private var resultPath: String = null
private var expectedResult: String = null
var tempFolder: TemporaryFolder = new TemporaryFolder()
@Rule
def getFolder(): TemporaryFolder = {
tempFolder;
}
@Before
@throws(classOf[Exception])
def before {
resultPath = tempFolder.newFile.toURI.toString
}
@After
@throws(classOf[Exception])
def after {
TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
}
@Test
@throws(classOf[Exception])
def testSumOfAllNeighborsNoValue {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL).writeAsCsv(resultPath)
env.execute
expectedResult = "1,10\n" + "2,4\n" + "3,12\n" + "4,8\n" + "5,8\n"
}
@Test
@throws(classOf[Exception])
def testSumOfOutNeighborsNoValue {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).writeAsCsv(resultPath)
env.execute
expectedResult = "1,5\n" + "2,3\n" + "3,9\n" + "4,5\n" + "5,1\n"
}
@Test
@throws(classOf[Exception])
def testSumOfAllNeighbors {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL)
result.writeAsCsv(resultPath)
env.execute
expectedResult = "1,11\n" + "2,6\n" + "3,15\n" + "4,12\n" + "5,13\n"
}
@Test
@throws(classOf[Exception])
def testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo = {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val result = graph.groupReduceOnNeighbors(new
SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN)
result.writeAsCsv(resultPath)
env.execute
expectedResult = "3,59\n" + "3,118\n" + "4,204\n" + "4,102\n" + "5,570\n" + "5,285"
}
final class SumNeighbors extends ReduceNeighborsFunction[Long] {
override def reduceNeighbors(firstNeighbor: Long, secondNeighbor: Long): Long = {
firstNeighbor + secondNeighbor
}
}
final class SumAllNeighbors extends NeighborsFunctionWithVertexValue[Long, Long, Long, (Long,
Long)] {
@throws(classOf[Exception])
def iterateNeighbors(vertex: Vertex[Long, Long], neighbors: Iterable[(Edge[Long, Long],
Vertex[Long, Long])], out: Collector[(Long, Long)]) {
var sum: Long = 0
for (neighbor <- neighbors) {
sum += neighbor._2.getValue
}
out.collect((vertex.getId, sum + vertex.getValue))
}
}
final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo extends
NeighborsFunction[Long, Long, Long, (Long, Long)] {
@throws(classOf[Exception])
def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Long], Vertex[Long, Long])],
out: Collector[(Long, Long)]) {
var sum: Long = 0
var next: (Long, Edge[Long, Long], Vertex[Long, Long]) = null
val neighborsIterator: Iterator[(Long, Edge[Long, Long], Vertex[Long, Long])] =
neighbors.iterator
while (neighborsIterator.hasNext) {
next = neighborsIterator.next
sum += next._3.getValue * next._2.getValue
}
if (next._1 > 2) {
out.collect(new Tuple2[Long, Long](next._1, sum))
out.collect(new Tuple2[Long, Long](next._1, sum * 2))
}
}
}
}
......@@ -383,6 +383,17 @@ public class Graph<K, VV, EV> {
TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) new TupleTypeInfo(
Vertex.class, keyType, valueType);
return mapVertices(mapper, returnType);
}
/**
* Apply a function to the attribute of each vertex in the graph.
*
* @param mapper the map function to apply.
* @param returnType the explicit return type.
* @return a new graph
*/
public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K,NV>> returnType) {
DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
......@@ -411,6 +422,18 @@ public class Graph<K, VV, EV> {
TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) new TupleTypeInfo(
Edge.class, keyType, keyType, valueType);
return mapEdges(mapper, returnType);
}
/**
* Apply a function to the attribute of each edge in the graph.
*
* @param mapper the map function to apply.
* @param returnType the explicit return type.
* @return a new graph
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K,NV>> returnType) {
DataSet<Edge<K, NV>> mappedEdges = edges.map(
new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
......@@ -752,6 +775,38 @@ public class Graph<K, VV, EV> {
}
}
/**
* Compute an aggregate over the edges of each vertex. The function applied
* on the edges has access to the vertex value.
*
* @param edgesFunction
* the function to apply to the neighborhood
* @param direction
* the edge direction (in-, out-, all-)
* @param <T>
* the output type
* @param typeInfo the explicit return type.
* @return a dataset of a T
* @throws IllegalArgumentException
*/
public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
switch (direction) {
case IN:
return vertices.coGroup(edges).where(0).equalTo(1)
.with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
case OUT:
return vertices.coGroup(edges).where(0).equalTo(0)
.with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
case ALL:
return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
default:
throw new IllegalArgumentException("Illegal edge direction");
}
}
/**
* Compute an aggregate over the edges of each vertex. The function applied
* on the edges only has access to the vertex id (not the vertex value).
......@@ -785,6 +840,40 @@ public class Graph<K, VV, EV> {
}
}
/**
* Compute an aggregate over the edges of each vertex. The function applied
* on the edges only has access to the vertex id (not the vertex value).
*
* @param edgesFunction
* the function to apply to the neighborhood
* @param direction
* the edge direction (in-, out-, all-)
* @param <T>
* the output type
* @param typeInfo the explicit return type.
* @return a dataset of T
* @throws IllegalArgumentException
*/
public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
switch (direction) {
case IN:
return edges.map(new ProjectVertexIdMap<K, EV>(1))
.withForwardedFields("f1->f0")
.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
case OUT:
return edges.map(new ProjectVertexIdMap<K, EV>(0))
.withForwardedFields("f0")
.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
case ALL:
return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
default:
throw new IllegalArgumentException("Illegal edge direction");
}
}
private static final class ProjectVertexIdMap<K, EV> implements MapFunction<
Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
......@@ -1410,6 +1499,51 @@ public class Graph<K, VV, EV> {
}
}
/**
* Compute an aggregate over the neighbors (edges and vertices) of each
* vertex. The function applied on the neighbors has access to the vertex
* value.
*
* @param neighborsFunction the function to apply to the neighborhood
* @param direction the edge direction (in-, out-, all-)
* @param <T> the output type
* @param typeInfo the explicit return type.
* @return a dataset of a T
* @throws IllegalArgumentException
*/
public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
switch (direction) {
case IN:
// create <edge-sourceVertex> pairs
DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
.join(this.vertices).where(0).equalTo(0);
return vertices.coGroup(edgesWithSources)
.where(0).equalTo("f0.f1")
.with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
case OUT:
// create <edge-targetVertex> pairs
DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
.join(this.vertices).where(1).equalTo(0);
return vertices.coGroup(edgesWithTargets)
.where(0).equalTo("f0.f0")
.with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
case ALL:
// create <edge-sourceOrTargetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
.join(this.vertices).where(1).equalTo(0)
.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
return vertices.coGroup(edgesWithNeighbors)
.where(0).equalTo(0)
.with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
default:
throw new IllegalArgumentException("Illegal edge direction");
}
}
/**
* Compute an aggregate over the neighbors (edges and vertices) of each
* vertex. The function applied on the neighbors only has access to the
......@@ -1454,6 +1588,51 @@ public class Graph<K, VV, EV> {
}
}
/**
* Compute an aggregate over the neighbors (edges and vertices) of each
* vertex. The function applied on the neighbors only has access to the
* vertex id (not the vertex value).
*
* @param neighborsFunction the function to apply to the neighborhood
* @param direction the edge direction (in-, out-, all-)
* @param <T> the output type
* @param typeInfo the explicit return type.
* @return a dataset of a T
* @throws IllegalArgumentException
*/
public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
switch (direction) {
case IN:
// create <edge-sourceVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
.join(this.vertices).where(0).equalTo(0)
.with(new ProjectVertexIdJoin<K, VV, EV>(1))
.withForwardedFieldsFirst("f1->f0");
return edgesWithSources.groupBy(0).reduceGroup(
new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
case OUT:
// create <edge-targetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
.join(this.vertices).where(1).equalTo(0)
.with(new ProjectVertexIdJoin<K, VV, EV>(0))
.withForwardedFieldsFirst("f0");
return edgesWithTargets.groupBy(0).reduceGroup(
new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
case ALL:
// create <edge-sourceOrTargetVertex> pairs
DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
.join(this.vertices).where(1).equalTo(0)
.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
return edgesWithNeighbors.groupBy(0).reduceGroup(
new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
default:
throw new IllegalArgumentException("Illegal edge direction");
}
}
private static final class ApplyNeighborGroupReduceFunction<K, VV, EV, T>
implements GroupReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
......
......@@ -46,6 +46,7 @@ under the License.
<module>flink-table</module>
<module>flink-ml</module>
<module>flink-language-binding</module>
<module>flink-gelly-scala</module>
</modules>
<!-- See main pom.xml for explanation of profiles -->
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册