提交 ee648925 编写于 作者: S smarthi 提交者: Till Rohrmann

[FLINK-3013] [gelly] Incorrect package declaration in GellyScalaAPICompletenessTest.scala

This closes #1356.
上级 5d37fe14
......@@ -170,7 +170,7 @@ object Graph {
/**
* Creates a Graph with from a CSV file of vertices and a CSV file of edges
*
* @param The Execution Environment.
* @param env Execution Environment.
* @param pathEdges The file path containing the edges.
* @param readVertices Defines whether the vertices have associated values.
* If set to false, the vertex input is ignored and vertices are created from the edges file.
......@@ -868,7 +868,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* Adds the list of vertices, passed as input, to the graph.
* If the vertices already exist in the graph, they will not be added once more.
*
* @param verticesToAdd the list of vertices to add
* @param vertices the list of vertices to add
* @return the new graph containing the existing and newly added vertices
*/
def addVertices(vertices: List[Vertex[K, VV]]): Graph[K, VV, EV] = {
......@@ -881,7 +881,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* When adding an edge for a non-existing set of vertices,
* the edge is considered invalid and ignored.
*
* @param newEdges the data set of edges to be added
* @param edges the data set of edges to be added
* @return a new graph containing the existing edges plus the newly added edges.
*/
def addEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
......@@ -916,7 +916,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
/**
* Removes the given vertex and its edges from the graph.
*
* @param vertex the vertex to remove
* @param vertices list of vertices to remove
* @return the new graph containing the existing vertices and edges without
* the removed vertex and its edges
*/
......@@ -938,7 +938,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
/**
* Removes all the edges that match the edges in the given data set from the graph.
*
* @param edgesToBeRemoved the list of edges to be removed
* @param edges the list of edges to be removed
* @return a new graph where the edges have been removed and in which the vertices remained intact
*/
def removeEdges(edges: List[Edge[K, EV]]): Graph[K, VV, EV] = {
......@@ -1016,7 +1016,7 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* The {@link ReduceNeighborsFunction} combines a pair of neighbor vertex values
* into one new value of the same type.
*
* @param reduceNeighborsFunction the reduce function to apply to the neighbors of each vertex.
* @param reduceEdgesFunction the reduce function to apply to the edges of each vertex.
* @param direction the edge direction (in-, out-, all-)
* @return a Dataset of Tuple2, with one tuple per vertex.
* The first field of the Tuple2 is the vertex ID and the second field
......
......@@ -16,7 +16,7 @@
* limitations under the License.
*/
package org.apache.flink.graph.scala.example;
package org.apache.flink.graph.scala.example
import org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
......@@ -32,7 +32,7 @@ import java.lang.Long
* You can find all available library methods in [[org.apache.flink.graph.library]].
*
* In particular, this example uses the
* [[org.apache.flink.graph.library.ConnectedComponentsAlgorithm.GSAConnectedComponents]]
* [[org.apache.flink.graph.library.GSAConnectedComponents]]
* library method to compute the connected components of the input graph.
*
* The input file is a plain text file and must be formatted as follows:
......@@ -70,7 +70,7 @@ object ConnectedComponents {
}
private final class InitVertices extends MapFunction[Long, Long] {
override def map(id: Long) = {id}
override def map(id: Long) = id
}
// ***********************************************************************
......@@ -87,19 +87,18 @@ object ConnectedComponents {
if(args.length != 3) {
System.err.println("Usage ConnectedComponents <edge path> <output path> " +
"<num iterations>")
false
}
fileOutput = true
edgesInputPath = args(0)
outputPath = args(1)
maxIterations = (2).toInt
maxIterations = 2
} else {
System.out.println("Executing ConnectedComponents example with default parameters" +
" and built-in default data.")
System.out.println(" Provide parameters to read input data from files.")
System.out.println(" See the documentation for the correct format of input files.")
System.out.println("Usage ConnectedComponents <edge path> <output path> " +
"<num iterations>");
"<num iterations>")
}
true
}
......
......@@ -16,20 +16,15 @@
* limitations under the License.
*/
package org.apache.flink.graph.scala.example;
package org.apache.flink.graph.scala.example
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.types.NullValue
import org.apache.flink.graph.Edge
import org.apache.flink.api.common.functions.MapFunction
import scala.collection.JavaConversions._
import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
import org.apache.flink.graph.gsa.GatherFunction
import org.apache.flink.graph.gsa.Neighbor
import org.apache.flink.graph.gsa.SumFunction
import org.apache.flink.graph.gsa.ApplyFunction
import org.apache.flink.graph.gsa.{ApplyFunction, GatherFunction, Neighbor, SumFunction}
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
/**
* This example shows how to use Gelly's gather-sum-apply iterations.
......@@ -121,20 +116,19 @@ object GSASingleSourceShortestPaths {
if(args.length != 4) {
System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
" <input edges path> <output path> <num iterations>")
false
}
fileOutput = true
srcVertexId = args(0).toLong
edgesInputPath = args(1)
outputPath = args(2)
maxIterations = (3).toInt
maxIterations = 3
} else {
System.out.println("Executing Single Source Shortest Paths example "
+ "with default parameters and built-in default data.")
System.out.println(" Provide parameters to read input data from files.")
System.out.println(" See the documentation for the correct format of input files.")
System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
" <input edges path> <output path> <num iterations>");
" <input edges path> <output path> <num iterations>")
}
true
}
......
......@@ -53,13 +53,13 @@ object GraphMetrics {
val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env)
/** get the number of vertices **/
val numVertices = graph.numberOfVertices;
val numVertices = graph.numberOfVertices
/** get the number of edges **/
val numEdges = graph.numberOfEdges;
val numEdges = graph.numberOfEdges
/** compute the average node degree **/
val verticesWithDegrees = graph.getDegrees;
val verticesWithDegrees = graph.getDegrees
val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble)
/** find the vertex with the maximum in-degree **/
......@@ -114,7 +114,7 @@ object GraphMetrics {
(key: Long, out: Collector[Edge[Long, NullValue]]) => {
val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
for ( i <- 0 to numOutEdges ) {
var target: Long = ((Math.random() * numVertices) + 1).toLong
val target: Long = ((Math.random() * numVertices) + 1).toLong
new Edge[Long, NullValue](key, target, NullValue.getInstance())
}
})
......
......@@ -16,11 +16,10 @@
* limitations under the License.
*/
package org.apache.flink.graph.scala.example;
package org.apache.flink.graph.scala.example
import org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.types.NullValue
import org.apache.flink.graph.Edge
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.graph.spargel.VertexUpdateFunction
......@@ -95,7 +94,7 @@ object SingleSourceShortestPaths {
override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) {
var minDistance = Double.MaxValue
while (inMessages.hasNext) {
var msg = inMessages.next
val msg = inMessages.next
if (msg < minDistance) {
minDistance = msg
}
......@@ -115,7 +114,7 @@ object SingleSourceShortestPaths {
override def sendMessages(vertex: Vertex[Long, Double]) {
for (edge: Edge[Long, Double] <- getEdges) {
sendMessageTo(edge.getTarget(), vertex.getValue + edge.getValue)
sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
}
}
}
......@@ -135,20 +134,19 @@ object SingleSourceShortestPaths {
if(args.length != 4) {
System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
" <input edges path> <output path> <num iterations>")
false
}
fileOutput = true
srcVertexId = args(0).toLong
edgesInputPath = args(1)
outputPath = args(2)
maxIterations = (3).toInt
maxIterations = 3
} else {
System.out.println("Executing Single Source Shortest Paths example "
+ "with default parameters and built-in default data.")
System.out.println(" Provide parameters to read input data from files.")
System.out.println(" See the documentation for the correct format of input files.")
System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
" <input edges path> <output path> <num iterations>");
" <input edges path> <output path> <num iterations>")
}
true
}
......
......@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala
package org.apache.flink.graph.scala.test
import java.lang.reflect.Method
import org.apache.flink.graph.scala._
......
......@@ -24,11 +24,11 @@ import org.apache.flink.graph.{Edge, Vertex}
object TestGraphUtils {
def getLongLongVertexData(env: ExecutionEnvironment): DataSet[Vertex[Long, Long]] = {
return env.fromCollection(getLongLongVertices)
env.fromCollection(getLongLongVertices)
}
def getLongLongEdgeData(env: ExecutionEnvironment): DataSet[Edge[Long, Long]] = {
return env.fromCollection(getLongLongEdges)
env.fromCollection(getLongLongEdges)
}
def getLongLongVertices: List[Vertex[Long, Long]] = {
......
......@@ -22,10 +22,10 @@ import org.apache.flink.api.scala._
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
......@@ -36,33 +36,33 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testInDegrees {
def testInDegrees() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.inDegrees.collect().toList
val res = graph.inDegrees().collect().toList
expectedResult = "(1,1)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,2)\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@throws(classOf[Exception])
def testOutDegrees {
def testOutDegrees() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.outDegrees.collect().toList
val res = graph.outDegrees().collect().toList
expectedResult = "(1,2)\n" + "(2,1)\n" + "(3,2)\n" + "(4,1)\n" + "(5,1)\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@throws(classOf[Exception])
def testGetDegrees {
def testGetDegrees() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.getDegrees.collect().toList
val res = graph.getDegrees().collect().toList
expectedResult = "(1,3)\n" + "(2,2)\n" + "(3,4)\n" + "(4,2)\n" + "(5,3)\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
......
......@@ -18,25 +18,20 @@
package org.apache.flink.graph.scala.test.operations
import java.io.{File, FileOutputStream, IOException, OutputStreamWriter}
import com.google.common.base.Charsets
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.{FileInputSplit, Path}
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.apache.flink.types.NullValue
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
import _root_.scala.collection.JavaConverters._
import java.io.IOException
import org.apache.flink.core.fs.FileInputSplit
import java.io.File
import java.io.OutputStreamWriter
import java.io.FileOutputStream
import java.io.FileOutputStream
import com.google.common.base.Charsets
import org.apache.flink.core.fs.Path
import org.apache.flink.types.NullValue
import org.apache.flink.api.common.functions.MapFunction
@RunWith(classOf[Parameterized])
class GraphCreationWithCsvITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
......@@ -46,7 +41,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testCsvWithValues {
def testCsvWithValues() {
/*
* Test with two Csv files, both vertices and edges have values
*/
......@@ -61,14 +56,14 @@ MultipleProgramsTestBase(mode) {
pathEdges = edgesSplit.getPath.toString,
env = env)
val result = graph.getTriplets.collect()
val result = graph.getTriplets().collect()
expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
}
@Test
@throws(classOf[Exception])
def testCsvNoEdgeValues {
def testCsvNoEdgeValues() {
/*
* Test with two Csv files; edges have no values
*/
......@@ -84,14 +79,14 @@ MultipleProgramsTestBase(mode) {
hasEdgeValues = false,
env = env)
val result = graph.getTriplets.collect()
val result = graph.getTriplets().collect()
expectedResult = "1,2,one,two,(null)\n3,2,three,two,(null)\n3,1,three,one,(null)\n"
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
}
@Test
@throws(classOf[Exception])
def testCsvWithMapperValues {
def testCsvWithMapperValues() {
/*
* Test with edges Csv file and vertex mapper initializer
*/
......@@ -104,14 +99,14 @@ MultipleProgramsTestBase(mode) {
vertexValueInitializer = new VertexDoubleIdAssigner(),
env = env)
val result = graph.getTriplets.collect()
val result = graph.getTriplets().collect()
expectedResult = "1,2,1.0,2.0,12\n3,2,3.0,2.0,32\n3,1,3.0,1.0,31\n"
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
}
@Test
@throws(classOf[Exception])
def testCsvNoVertexValues {
def testCsvNoVertexValues() {
/*
* Test with edges Csv file: no vertex values
*/
......@@ -123,15 +118,15 @@ MultipleProgramsTestBase(mode) {
pathEdges = edgesSplit.getPath.toString,
env = env)
val result = graph.getTriplets.collect()
val result = graph.getTriplets().collect()
expectedResult = "1,2,(null),(null),12\n3,2,(null),(null),32\n" +
"3,1,(null),(null),31\n"
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
}
@Test
@throws(classOf[Exception])
def testCsvNoValues {
def testCsvNoValues() {
/*
* Test with edges Csv file: neither vertex nor edge values
*/
......@@ -144,15 +139,15 @@ MultipleProgramsTestBase(mode) {
hasEdgeValues = false,
env = env)
val result = graph.getTriplets.collect()
val result = graph.getTriplets().collect()
expectedResult = "1,2,(null),(null),(null)\n" +
"3,2,(null),(null),(null)\n3,1,(null),(null),(null)\n"
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
}
@Test
@throws(classOf[Exception])
def testCsvOptionsVertices {
def testCsvOptionsVertices() {
/*
* Test the options for vertices: delimiters, comments, ignore first line.
*/
......@@ -172,14 +167,14 @@ MultipleProgramsTestBase(mode) {
pathEdges = edgesSplit.getPath.toString,
env = env)
val result = graph.getTriplets.collect()
val result = graph.getTriplets().collect()
expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
}
@Test
@throws(classOf[Exception])
def testCsvOptionsEdges {
def testCsvOptionsEdges() {
/*
* Test the options for edges: delimiters, comments, ignore first line.
*/
......@@ -199,9 +194,9 @@ MultipleProgramsTestBase(mode) {
pathEdges = edgesSplit.getPath.toString,
env = env)
val result = graph.getTriplets.collect()
val result = graph.getTriplets().collect()
expectedResult = "1,2,1,2,ot\n3,2,3,2,tt\n3,1,3,1,to\n"
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult);
TestBaseUtils.compareResultAsTuples(result.asJava, expectedResult)
}
@throws(classOf[IOException])
......@@ -214,7 +209,7 @@ MultipleProgramsTestBase(mode) {
wrt.close()
new FileInputSplit(0, new Path(tempFile.toURI.toString), 0, tempFile.length,
Array("localhost"));
Array("localhost"))
}
final class VertexDoubleIdAssigner extends MapFunction[Long, Double] {
......
......@@ -23,10 +23,10 @@ import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
import org.junit.Test
import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
......@@ -37,7 +37,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testAddVertex {
def testAddVertex() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -50,7 +50,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testAddVertexExisting {
def testAddVertexExisting() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -62,7 +62,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testAddVertexNoEdges {
def testAddVertexNoEdges() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -74,7 +74,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testAddVertices {
def testAddVertices() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -88,7 +88,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testAddVerticesExisting {
def testAddVerticesExisting() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -102,7 +102,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testRemoveVertex {
def testRemoveVertex() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -114,7 +114,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testRemoveInvalidVertex {
def testRemoveInvalidVertex() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -127,7 +127,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testRemoveVertices {
def testRemoveVertices() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -140,7 +140,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testRemoveValidAndInvalidVertex {
def testRemoveValidAndInvalidVertex() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -153,7 +153,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testAddEdge {
def testAddEdge() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -167,7 +167,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testAddEdges {
def testAddEdges() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -181,7 +181,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testAddEdgesInvalidVertices {
def testAddEdgesInvalidVertices() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -195,7 +195,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testAddExistingEdge {
def testAddExistingEdge() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -209,7 +209,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testRemoveEdge {
def testRemoveEdge() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -221,7 +221,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testRemoveInvalidEdge {
def testRemoveInvalidEdge() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -234,7 +234,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testRemoveEdges {
def testRemoveEdges() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -247,7 +247,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testRemoveSameEdgeTwiceEdges {
def testRemoveSameEdgeTwiceEdges() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......
......@@ -23,10 +23,9 @@ import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.graph.{Edge, Vertex}
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
import org.junit.Test
import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
......@@ -37,11 +36,11 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testUndirected {
def testUndirected() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.getUndirected.getEdges.collect().toList;
val res = graph.getUndirected().getEdges.collect().toList
expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," +
"23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" +
......@@ -51,11 +50,11 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testReverse {
def testReverse() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.reverse().getEdges.collect().toList;
val res = graph.reverse().getEdges.collect().toList
expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," +
"45\n" + "1,5,51\n"
......@@ -64,22 +63,22 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testSubGraph {
def testSubGraph() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.subgraph(new FilterFunction[Vertex[Long, Long]] {
@throws(classOf[Exception])
def filter(vertex: Vertex[Long, Long]): Boolean = {
return (vertex.getValue > 2)
vertex.getValue > 2
}
}, new FilterFunction[Edge[Long, Long]] {
@throws(classOf[Exception])
override def filter(edge: Edge[Long, Long]): Boolean = {
return (edge.getValue > 34)
edge.getValue > 34
}
}).getEdges.collect().toList;
}).getEdges.collect().toList
expectedResult = "3,5,35\n" + "4,5,45\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
......@@ -87,14 +86,14 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testSubGraphSugar {
def testSubGraphSugar() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.subgraph(
vertex => vertex.getValue > 2,
edge => edge.getValue > 34
).getEdges.collect().toList;
).getEdges.collect().toList
expectedResult = "3,5,35\n" + "4,5,45\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
......@@ -102,7 +101,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testFilterOnVertices {
def testFilterOnVertices() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -111,7 +110,7 @@ MultipleProgramsTestBase(mode) {
def filter(vertex: Vertex[Long, Long]): Boolean = {
vertex.getValue > 2
}
}).getEdges.collect().toList;
}).getEdges.collect().toList
expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
......@@ -119,13 +118,13 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testFilterOnVerticesSugar {
def testFilterOnVerticesSugar() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.filterOnVertices(
vertex => vertex.getValue > 2
).getEdges.collect().toList;
).getEdges.collect().toList
expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
......@@ -133,7 +132,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testFilterOnEdges {
def testFilterOnEdges() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -142,7 +141,7 @@ MultipleProgramsTestBase(mode) {
def filter(edge: Edge[Long, Long]): Boolean = {
edge.getValue > 34
}
}).getEdges.collect().toList;
}).getEdges.collect().toList
expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
......@@ -150,13 +149,13 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testFilterOnEdgesSugar {
def testFilterOnEdgesSugar() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.filterOnEdges(
edge => edge.getValue > 34
).getEdges.collect().toList;
).getEdges.collect().toList
expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
......@@ -164,44 +163,44 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testNumberOfVertices {
def testNumberOfVertices() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = env.fromElements(graph.numberOfVertices).collect().toList
val res = env.fromElements(graph.numberOfVertices()).collect().toList
expectedResult = "5"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@throws(classOf[Exception])
def testNumberOfEdges {
def testNumberOfEdges() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = env.fromElements(graph.numberOfEdges).collect().toList
val res = env.fromElements(graph.numberOfEdges()).collect().toList
expectedResult = "7"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@throws(classOf[Exception])
def testVertexIds {
def testVertexIds() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.getVertexIds.collect().toList
val res = graph.getVertexIds().collect().toList
expectedResult = "1\n2\n3\n4\n5\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
}
@Test
@throws(classOf[Exception])
def testEdgesIds {
def testEdgesIds() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.getEdgeIds.collect().toList
val res = graph.getEdgeIds().collect().toList
expectedResult = "(1,2)\n" + "(1,3)\n" + "(2,3)\n" + "(3,4)\n" + "(3,5)\n" + "(4,5)\n" +
"(5,1)\n"
TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
......@@ -209,7 +208,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testUnion {
def testUnion() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -229,7 +228,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testDifference {
def testDifference() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -250,7 +249,7 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testDifferenceNoCommonVertices {
def testDifferenceNoCommonVertices() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
......@@ -270,11 +269,11 @@ MultipleProgramsTestBase(mode) {
@Test
@throws(classOf[Exception])
def testTriplets {
def testTriplets() {
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
.getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
val res = graph.getTriplets.collect().toList
val res = graph.getTriplets().collect().toList
expectedResult = "1,2,1,2,12\n" + "1,3,1,3,13\n" + "2,3,2,3,23\n" + "3,4,3,4,34\n" +
"3,5,3,5,35\n" + "4,5,4,5,45\n" + "5,1,5,1,51\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
......
......@@ -20,17 +20,16 @@ package org.apache.flink.graph.scala.test.operations
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._
import org.apache.flink.graph.Edge
import org.apache.flink.graph.{Edge, EdgeJoinFunction}
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.graph.scala.utils.EdgeToTuple3Map
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
import _root_.scala.collection.JavaConverters._
import org.apache.flink.graph.EdgeJoinFunction
@RunWith(classOf[Parameterized])
class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
......
......@@ -18,18 +18,17 @@
package org.apache.flink.graph.scala.test.operations
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.scala._
import org.apache.flink.graph.VertexJoinFunction
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.graph.scala.utils.VertexToTuple2Map
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
import _root_.scala.collection.JavaConverters._
import org.apache.flink.graph.VertexJoinFunction
@RunWith(classOf[Parameterized])
class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
......
......@@ -24,10 +24,10 @@ import org.apache.flink.graph.Edge
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
......
......@@ -24,10 +24,10 @@ import org.apache.flink.graph.Vertex
import org.apache.flink.graph.scala._
import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.junit.rules.TemporaryFolder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
......@@ -47,7 +47,7 @@ MultipleProgramsTestBase(mode) {
"2,3\n" +
"3,4\n" +
"4,5\n" +
"5,6\n";
"5,6\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
......@@ -62,7 +62,7 @@ MultipleProgramsTestBase(mode) {
"2,3\n" +
"3,4\n" +
"4,5\n" +
"5,6\n";
"5,6\n"
TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
}
......
......@@ -24,10 +24,10 @@ import org.apache.flink.graph.scala.test.TestGraphUtils
import org.apache.flink.graph.scala.{EdgesFunction, EdgesFunctionWithVertexValue, Graph}
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.apache.flink.util.Collector
import org.junit.rules.TemporaryFolder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
......
......@@ -24,10 +24,10 @@ import org.apache.flink.graph.scala.{NeighborsFunctionWithVertexValue, _}
import org.apache.flink.graph.{Edge, EdgeDirection, ReduceNeighborsFunction, Vertex}
import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
import org.apache.flink.util.Collector
import org.junit.rules.TemporaryFolder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.{After, Before, Rule, Test}
import _root_.scala.collection.JavaConverters._
@RunWith(classOf[Parameterized])
......
......@@ -135,7 +135,7 @@ public class IncrementalSSSP implements ProgramDescription {
*
* @param edgeToBeRemoved
* @param edgesInSSSP
* @return
* @return true or false
*/
public static boolean isInSSSP(final Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception {
......@@ -154,9 +154,7 @@ public class IncrementalSSSP implements ProgramDescription {
if (inMessages.hasNext()) {
Long outDegree = getOutDegree() - 1;
// check if the vertex has another SP-Edge
if (outDegree > 0) {
// there is another shortest path from the source to this vertex
} else {
if (outDegree <= 0) {
// set own value to infinity
setNewVertexValue(Double.MAX_VALUE);
}
......
......@@ -56,7 +56,7 @@ public class GSACompilerTest extends CompilerTestBase {
env.setParallelism(DEFAULT_PARALLELISM);
// compose test program
{
DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<>(
1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
......@@ -124,7 +124,7 @@ public class GSACompilerTest extends CompilerTestBase {
public Long gather(Neighbor<Long, NullValue> neighbor) {
return neighbor.getNeighborValue();
}
};
}
@SuppressWarnings("serial")
private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
......@@ -132,7 +132,7 @@ public class GSACompilerTest extends CompilerTestBase {
public Long sum(Long newValue, Long currentValue) {
return Math.min(newValue, currentValue);
}
};
}
@SuppressWarnings("serial")
private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
......
......@@ -71,7 +71,7 @@ public class GSATranslationTest {
// ------------ construct the test program ------------------
{
DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<>(
1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
......@@ -98,7 +98,7 @@ public class GSATranslationTest {
assertTrue(result instanceof DeltaIterationResultSet);
DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
// check the basic iteration properties
assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
......@@ -142,7 +142,7 @@ public class GSATranslationTest {
public Long gather(Neighbor<Long, NullValue> neighbor) {
return neighbor.getNeighborValue();
}
};
}
@SuppressWarnings("serial")
private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
......@@ -150,7 +150,7 @@ public class GSATranslationTest {
public Long sum(Long newValue, Long currentValue) {
return Math.min(newValue, currentValue);
}
};
}
@SuppressWarnings("serial")
private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
......
......@@ -61,14 +61,14 @@ public class SpargelCompilerTest extends CompilerTestBase {
{
DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
.map(new Tuple2ToVertexMap<Long, Long>());
DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
}
});
......@@ -143,14 +143,14 @@ public class SpargelCompilerTest extends CompilerTestBase {
DataSet<Long> bcVar = env.fromElements(1L);
DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
new Tuple2<>(1L, 1L), new Tuple2<>(2L, 2L))
.map(new Tuple2ToVertexMap<Long, Long>());
DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<>(1L, 2L))
.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
return new Edge<>(edge.f0, edge.f1, NullValue.getInstance());
}
});
......
......@@ -52,7 +52,7 @@ public class SpargelTranslationTest {
final String BC_SET_MESSAGES_NAME = "borat messages";
final String BC_SET_UPDATES_NAME = "borat updates";
;
final int NUM_ITERATIONS = 13;
final int ITERATION_parallelism = 77;
......@@ -68,16 +68,16 @@ public class SpargelTranslationTest {
// ------------ construct the test program ------------------
{
DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
public Tuple3<String, String, NullValue> map(
Tuple2<String, String> edge) {
return new Tuple3<String, String, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
}
}), env);
......@@ -101,7 +101,7 @@ public class SpargelTranslationTest {
assertTrue(result instanceof DeltaIterationResultSet);
DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
// check the basic iteration properties
assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
......@@ -139,7 +139,7 @@ public class SpargelTranslationTest {
final String BC_SET_MESSAGES_NAME = "borat messages";
final String BC_SET_UPDATES_NAME = "borat updates";
;
final int NUM_ITERATIONS = 13;
final int ITERATION_parallelism = 77;
......@@ -154,16 +154,16 @@ public class SpargelTranslationTest {
// ------------ construct the test program ------------------
{
DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<String, Double>("abc", 3.44));
DataSet<Tuple2<String, Double>> initialVertices = env.fromElements(new Tuple2<>("abc", 3.44));
DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<String, String>("a", "c"));
DataSet<Tuple2<String, String>> edges = env.fromElements(new Tuple2<>("a", "c"));
Graph<String, Double, NullValue> graph = Graph.fromTupleDataSet(initialVertices,
edges.map(new MapFunction<Tuple2<String,String>, Tuple3<String, String, NullValue>>() {
public Tuple3<String, String, NullValue> map(
Tuple2<String, String> edge) {
return new Tuple3<String, String, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
return new Tuple3<>(edge.f0, edge.f1, NullValue.getInstance());
}
}), env);
......@@ -187,7 +187,7 @@ public class SpargelTranslationTest {
assertTrue(result instanceof DeltaIterationResultSet);
DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
DeltaIteration<?, ?> iteration = resultSet.getIterationHead();
// check the basic iteration properties
assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
......
......@@ -78,7 +78,7 @@ public class CollectionModeSuperstepITCase {
public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
public Long map(Vertex<Long, Long> value) {
return 1l;
return 1L;
}
}
}
\ No newline at end of file
......@@ -382,11 +382,10 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
private static final class FindAllReachableVertices extends SumFunction<HashSet<Long>, Long, HashSet<Long>> {
@Override
public HashSet<Long> sum(HashSet<Long> newSet, HashSet<Long> currentSet) {
HashSet<Long> set = currentSet;
for(Long l : newSet) {
set.add(l);
currentSet.add(l);
}
return set;
return currentSet;
}
}
......
......@@ -79,7 +79,7 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
new InitMapperSSSP(), env);
List<Vertex<Long, Double>> result = inputGraph.run(
new GSASingleSourceShortestPaths<Long>(1l, 16)).collect();
new GSASingleSourceShortestPaths<>(1L, 16)).collect();
expectedResult = "1,0.0\n" +
"2,12.0\n" +
......
......@@ -32,176 +32,156 @@ import org.apache.flink.graph.Vertex;
public class TestGraphUtils {
public static final DataSet<Vertex<Long, Long>> getLongLongVertexData(
public static DataSet<Vertex<Long, Long>> getLongLongVertexData(
ExecutionEnvironment env) {
return env.fromCollection(getLongLongVertices());
}
public static final DataSet<Edge<Long, Long>> getLongLongEdgeData(
public static DataSet<Edge<Long, Long>> getLongLongEdgeData(
ExecutionEnvironment env) {
return env.fromCollection(getLongLongEdges());
}
public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcData(
public static DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcData(
ExecutionEnvironment env) {
List<Edge<Long, Long>> edges = getLongLongEdges();
edges.remove(1);
edges.add(new Edge<Long, Long>(13L, 3L, 13L));
edges.add(new Edge<>(13L, 3L, 13L));
return env.fromCollection(edges);
}
public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidTrgData(
public static DataSet<Edge<Long, Long>> getLongLongEdgeInvalidTrgData(
ExecutionEnvironment env) {
List<Edge<Long, Long>> edges = getLongLongEdges();
edges.remove(0);
edges.add(new Edge<Long, Long>(3L, 13L, 13L));
edges.add(new Edge<>(3L, 13L, 13L));
return env.fromCollection(edges);
}
public static final DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcTrgData(
public static DataSet<Edge<Long, Long>> getLongLongEdgeInvalidSrcTrgData(
ExecutionEnvironment env) {
List<Edge<Long, Long>> edges = getLongLongEdges();
edges.remove(0);
edges.remove(1);
edges.remove(2);
edges.add(new Edge<Long, Long>(13L, 3L, 13L));
edges.add(new Edge<Long, Long>(1L, 12L, 12L));
edges.add(new Edge<Long, Long>(13L, 33L, 13L));
edges.add(new Edge<>(13L, 3L, 13L));
edges.add(new Edge<>(1L, 12L, 12L));
edges.add(new Edge<>(13L, 33L, 13L));
return env.fromCollection(edges);
}
public static final DataSet<Edge<String, Long>> getStringLongEdgeData(
public static DataSet<Edge<String, Long>> getStringLongEdgeData(
ExecutionEnvironment env) {
List<Edge<String, Long>> edges = new ArrayList<Edge<String, Long>>();
edges.add(new Edge<String, Long>("1", "2", 12L));
edges.add(new Edge<String, Long>("1", "3", 13L));
edges.add(new Edge<String, Long>("2", "3", 23L));
edges.add(new Edge<String, Long>("3", "4", 34L));
edges.add(new Edge<String, Long>("3", "5", 35L));
edges.add(new Edge<String, Long>("4", "5", 45L));
edges.add(new Edge<String, Long>("5", "1", 51L));
List<Edge<String, Long>> edges = new ArrayList<>();
edges.add(new Edge<>("1", "2", 12L));
edges.add(new Edge<>("1", "3", 13L));
edges.add(new Edge<>("2", "3", 23L));
edges.add(new Edge<>("3", "4", 34L));
edges.add(new Edge<>("3", "5", 35L));
edges.add(new Edge<>("4", "5", 45L));
edges.add(new Edge<>("5", "1", 51L));
return env.fromCollection(edges);
}
public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2Data(
public static DataSet<Tuple2<Long, Long>> getLongLongTuple2Data(
ExecutionEnvironment env) {
List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
tuples.add(new Tuple2<Long, Long>(1L, 10L));
tuples.add(new Tuple2<Long, Long>(2L, 20L));
tuples.add(new Tuple2<Long, Long>(3L, 30L));
tuples.add(new Tuple2<Long, Long>(4L, 40L));
tuples.add(new Tuple2<Long, Long>(6L, 60L));
List<Tuple2<Long, Long>> tuples = new ArrayList<>();
tuples.add(new Tuple2<>(1L, 10L));
tuples.add(new Tuple2<>(2L, 20L));
tuples.add(new Tuple2<>(3L, 30L));
tuples.add(new Tuple2<>(4L, 40L));
tuples.add(new Tuple2<>(6L, 60L));
return env.fromCollection(tuples);
}
public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2SourceData(
public static DataSet<Tuple2<Long, Long>> getLongLongTuple2SourceData(
ExecutionEnvironment env) {
List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
tuples.add(new Tuple2<Long, Long>(1L, 10L));
tuples.add(new Tuple2<Long, Long>(1L, 20L));
tuples.add(new Tuple2<Long, Long>(2L, 30L));
tuples.add(new Tuple2<Long, Long>(3L, 40L));
tuples.add(new Tuple2<Long, Long>(3L, 50L));
tuples.add(new Tuple2<Long, Long>(4L, 60L));
tuples.add(new Tuple2<Long, Long>(6L, 70L));
List<Tuple2<Long, Long>> tuples = new ArrayList<>();
tuples.add(new Tuple2<>(1L, 10L));
tuples.add(new Tuple2<>(1L, 20L));
tuples.add(new Tuple2<>(2L, 30L));
tuples.add(new Tuple2<>(3L, 40L));
tuples.add(new Tuple2<>(3L, 50L));
tuples.add(new Tuple2<>(4L, 60L));
tuples.add(new Tuple2<>(6L, 70L));
return env.fromCollection(tuples);
}
public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2TargetData(
public static DataSet<Tuple2<Long, Long>> getLongLongTuple2TargetData(
ExecutionEnvironment env) {
List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
tuples.add(new Tuple2<Long, Long>(2L, 10L));
tuples.add(new Tuple2<Long, Long>(3L, 20L));
tuples.add(new Tuple2<Long, Long>(3L, 30L));
tuples.add(new Tuple2<Long, Long>(4L, 40L));
tuples.add(new Tuple2<Long, Long>(6L, 50L));
tuples.add(new Tuple2<Long, Long>(6L, 60L));
tuples.add(new Tuple2<Long, Long>(1L, 70L));
List<Tuple2<Long, Long>> tuples = new ArrayList<>();
tuples.add(new Tuple2<>(2L, 10L));
tuples.add(new Tuple2<>(3L, 20L));
tuples.add(new Tuple2<>(3L, 30L));
tuples.add(new Tuple2<>(4L, 40L));
tuples.add(new Tuple2<>(6L, 50L));
tuples.add(new Tuple2<>(6L, 60L));
tuples.add(new Tuple2<>(1L, 70L));
return env.fromCollection(tuples);
}
public static final DataSet<Tuple3<Long, Long, Long>> getLongLongLongTuple3Data(
public static DataSet<Tuple3<Long, Long, Long>> getLongLongLongTuple3Data(
ExecutionEnvironment env) {
List<Tuple3<Long, Long, Long>> tuples = new ArrayList<Tuple3<Long, Long, Long>>();
tuples.add(new Tuple3<Long, Long, Long>(1L, 2L, 12L));
tuples.add(new Tuple3<Long, Long, Long>(1L, 3L, 13L));
tuples.add(new Tuple3<Long, Long, Long>(2L, 3L, 23L));
tuples.add(new Tuple3<Long, Long, Long>(3L, 4L, 34L));
tuples.add(new Tuple3<Long, Long, Long>(3L, 6L, 36L));
tuples.add(new Tuple3<Long, Long, Long>(4L, 6L, 46L));
tuples.add(new Tuple3<Long, Long, Long>(6L, 1L, 61L));
List<Tuple3<Long, Long, Long>> tuples = new ArrayList<>();
tuples.add(new Tuple3<>(1L, 2L, 12L));
tuples.add(new Tuple3<>(1L, 3L, 13L));
tuples.add(new Tuple3<>(2L, 3L, 23L));
tuples.add(new Tuple3<>(3L, 4L, 34L));
tuples.add(new Tuple3<>(3L, 6L, 36L));
tuples.add(new Tuple3<>(4L, 6L, 46L));
tuples.add(new Tuple3<>(6L, 1L, 61L));
return env.fromCollection(tuples);
}
public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2Data(
public static DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2Data(
ExecutionEnvironment env) {
List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long,
DummyCustomParameterizedType<Float>>>();
tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
new DummyCustomParameterizedType<Float>(10, 10f)));
tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
new DummyCustomParameterizedType<Float>(20, 20f)));
tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
new DummyCustomParameterizedType<Float>(30, 30f)));
tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(4L,
new DummyCustomParameterizedType<Float>(40, 40f)));
List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<>();
tuples.add(new Tuple2<>(1L, new DummyCustomParameterizedType<>(10, 10f)));
tuples.add(new Tuple2<>(2L, new DummyCustomParameterizedType<>(20, 20f)));
tuples.add(new Tuple2<>(3L, new DummyCustomParameterizedType<>(30, 30f)));
tuples.add(new Tuple2<>(4L, new DummyCustomParameterizedType<>(40, 40f)));
return env.fromCollection(tuples);
}
public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2SourceData(
public static DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2SourceData(
ExecutionEnvironment env) {
List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long,
DummyCustomParameterizedType<Float>>>();
tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
new DummyCustomParameterizedType<Float>(10, 10f)));
tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
new DummyCustomParameterizedType<Float>(20, 20f)));
tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
new DummyCustomParameterizedType<Float>(30, 30f)));
tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
new DummyCustomParameterizedType<Float>(40, 40f)));
List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<>();
tuples.add(new Tuple2<>(1L, new DummyCustomParameterizedType<>(10, 10f)));
tuples.add(new Tuple2<>(1L, new DummyCustomParameterizedType<>(20, 20f)));
tuples.add(new Tuple2<>(2L, new DummyCustomParameterizedType<>(30, 30f)));
tuples.add(new Tuple2<>(3L, new DummyCustomParameterizedType<>(40, 40f)));
return env.fromCollection(tuples);
}
public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2TargetData(
public static DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>> getLongCustomTuple2TargetData(
ExecutionEnvironment env) {
List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<Tuple2<Long,
DummyCustomParameterizedType<Float>>>();
tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
new DummyCustomParameterizedType<Float>(10, 10f)));
tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
new DummyCustomParameterizedType<Float>(20, 20f)));
tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
new DummyCustomParameterizedType<Float>(30, 30f)));
tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(4L,
new DummyCustomParameterizedType<Float>(40, 40f)));
List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<>();
tuples.add(new Tuple2<>(2L, new DummyCustomParameterizedType<>(10, 10f)));
tuples.add(new Tuple2<>(3L, new DummyCustomParameterizedType<>(20, 20f)));
tuples.add(new Tuple2<>(3L, new DummyCustomParameterizedType<>(30, 30f)));
tuples.add(new Tuple2<>(4L, new DummyCustomParameterizedType<>(40, 40f)));
return env.fromCollection(tuples);
}
public static final DataSet<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> getLongLongCustomTuple3Data(
public static DataSet<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> getLongLongCustomTuple3Data(
ExecutionEnvironment env) {
List<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> tuples =
new ArrayList<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>>();
tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L, 2L,
new DummyCustomParameterizedType<Float>(10, 10f)));
tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L, 3L,
new DummyCustomParameterizedType<Float>(20, 20f)));
tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(2L, 3L,
new DummyCustomParameterizedType<Float>(30, 30f)));
tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(3L, 4L,
new DummyCustomParameterizedType<Float>(40, 40f)));
List<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> tuples = new ArrayList<>();
tuples.add(new Tuple3<>(1L, 2L, new DummyCustomParameterizedType<>(10, 10f)));
tuples.add(new Tuple3<>(1L, 3L, new DummyCustomParameterizedType<>(20, 20f)));
tuples.add(new Tuple3<>(2L, 3L, new DummyCustomParameterizedType<>(30, 30f)));
tuples.add(new Tuple3<>(3L, 4L, new DummyCustomParameterizedType<>(40, 40f)));
return env.fromCollection(tuples);
}
......@@ -209,12 +189,12 @@ public class TestGraphUtils {
/**
* A graph with invalid vertex ids
*/
public static final DataSet<Vertex<Long, Long>> getLongLongInvalidVertexData(
public static DataSet<Vertex<Long, Long>> getLongLongInvalidVertexData(
ExecutionEnvironment env) {
List<Vertex<Long, Long>> vertices = getLongLongVertices();
vertices.remove(0);
vertices.add(new Vertex<Long, Long>(15L, 1L));
vertices.add(new Vertex<>(15L, 1L));
return env.fromCollection(vertices);
}
......@@ -222,15 +202,15 @@ public class TestGraphUtils {
/**
* A graph that has at least one vertex with no ingoing/outgoing edges
*/
public static final DataSet<Edge<Long, Long>> getLongLongEdgeDataWithZeroDegree(
public static DataSet<Edge<Long, Long>> getLongLongEdgeDataWithZeroDegree(
ExecutionEnvironment env) {
List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
edges.add(new Edge<Long, Long>(1L, 2L, 12L));
edges.add(new Edge<Long, Long>(1L, 4L, 14L));
edges.add(new Edge<Long, Long>(1L, 5L, 15L));
edges.add(new Edge<Long, Long>(2L, 3L, 23L));
edges.add(new Edge<Long, Long>(3L, 5L, 35L));
edges.add(new Edge<Long, Long>(4L, 5L, 45L));
List<Edge<Long, Long>> edges = new ArrayList<>();
edges.add(new Edge<>(1L, 2L, 12L));
edges.add(new Edge<>(1L, 4L, 14L));
edges.add(new Edge<>(1L, 5L, 15L));
edges.add(new Edge<>(2L, 3L, 23L));
edges.add(new Edge<>(3L, 5L, 35L));
edges.add(new Edge<>(4L, 5L, 45L));
return env.fromCollection(edges);
}
......@@ -238,35 +218,34 @@ public class TestGraphUtils {
/**
* Function that produces an ArrayList of vertices
*/
public static final List<Vertex<Long, Long>> getLongLongVertices() {
List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
vertices.add(new Vertex<Long, Long>(1L, 1L));
vertices.add(new Vertex<Long, Long>(2L, 2L));
vertices.add(new Vertex<Long, Long>(3L, 3L));
vertices.add(new Vertex<Long, Long>(4L, 4L));
vertices.add(new Vertex<Long, Long>(5L, 5L));
public static List<Vertex<Long, Long>> getLongLongVertices() {
List<Vertex<Long, Long>> vertices = new ArrayList<>();
vertices.add(new Vertex<>(1L, 1L));
vertices.add(new Vertex<>(2L, 2L));
vertices.add(new Vertex<>(3L, 3L));
vertices.add(new Vertex<>(4L, 4L));
vertices.add(new Vertex<>(5L, 5L));
return vertices;
}
public static final List<Vertex<Long, Boolean>> getLongBooleanVertices() {
List<Vertex<Long, Boolean>> vertices = new ArrayList<Vertex<Long, Boolean>>();
vertices.add(new Vertex<Long, Boolean>(1L, true));
vertices.add(new Vertex<Long, Boolean>(2L, true));
vertices.add(new Vertex<Long, Boolean>(3L, true));
vertices.add(new Vertex<Long, Boolean>(4L, true));
vertices.add(new Vertex<Long, Boolean>(5L, true));
public static List<Vertex<Long, Boolean>> getLongBooleanVertices() {
List<Vertex<Long, Boolean>> vertices = new ArrayList<>();
vertices.add(new Vertex<>(1L, true));
vertices.add(new Vertex<>(2L, true));
vertices.add(new Vertex<>(3L, true));
vertices.add(new Vertex<>(4L, true));
vertices.add(new Vertex<>(5L, true));
return vertices;
}
public static final DataSet<Edge<Long, Long>> getDisconnectedLongLongEdgeData(
ExecutionEnvironment env) {
List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
edges.add(new Edge<Long, Long>(1L, 2L, 12L));
edges.add(new Edge<Long, Long>(1L, 3L, 13L));
edges.add(new Edge<Long, Long>(2L, 3L, 23L));
edges.add(new Edge<Long, Long>(4L, 5L, 45L));
public static DataSet<Edge<Long, Long>> getDisconnectedLongLongEdgeData(ExecutionEnvironment env) {
List<Edge<Long, Long>> edges = new ArrayList<>();
edges.add(new Edge<>(1L, 2L, 12L));
edges.add(new Edge<>(1L, 3L, 13L));
edges.add(new Edge<>(2L, 3L, 23L));
edges.add(new Edge<>(4L, 5L, 45L));
return env.fromCollection(edges);
}
......@@ -274,15 +253,15 @@ public class TestGraphUtils {
/**
* Function that produces an ArrayList of edges
*/
public static final List<Edge<Long, Long>> getLongLongEdges() {
List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
edges.add(new Edge<Long, Long>(1L, 2L, 12L));
edges.add(new Edge<Long, Long>(1L, 3L, 13L));
edges.add(new Edge<Long, Long>(2L, 3L, 23L));
edges.add(new Edge<Long, Long>(3L, 4L, 34L));
edges.add(new Edge<Long, Long>(3L, 5L, 35L));
edges.add(new Edge<Long, Long>(4L, 5L, 45L));
edges.add(new Edge<Long, Long>(5L, 1L, 51L));
public static List<Edge<Long, Long>> getLongLongEdges() {
List<Edge<Long, Long>> edges = new ArrayList<>();
edges.add(new Edge<>(1L, 2L, 12L));
edges.add(new Edge<>(1L, 3L, 13L));
edges.add(new Edge<>(2L, 3L, 23L));
edges.add(new Edge<>(3L, 4L, 34L));
edges.add(new Edge<>(3L, 5L, 35L));
edges.add(new Edge<>(4L, 5L, 45L));
edges.add(new Edge<>(5L, 1L, 51L));
return edges;
}
......@@ -373,45 +352,41 @@ public class TestGraphUtils {
/**
* utils for getting the second graph for the test of method difference();
* @param env
* @param env - ExecutionEnvironment
*/
public static final DataSet<Edge<Long,Long>> getLongLongEdgeDataDifference(
ExecutionEnvironment env){
public static DataSet<Edge<Long,Long>> getLongLongEdgeDataDifference(ExecutionEnvironment env) {
return env.fromCollection(getLongLongEdgesForDifference());
}
public static final DataSet<Edge<Long,Long>> getLongLongEdgeDataDifference2(
ExecutionEnvironment env){
public static DataSet<Edge<Long,Long>> getLongLongEdgeDataDifference2(ExecutionEnvironment env) {
return env.fromCollection(getLongLongEdgesForDifference2());
}
public static final DataSet<Vertex<Long,Long>> getLongLongVertexDataDifference(
ExecutionEnvironment env)
{
public static DataSet<Vertex<Long,Long>> getLongLongVertexDataDifference(ExecutionEnvironment env) {
return env.fromCollection(getVerticesForDifference());
}
public static final List<Vertex<Long,Long>> getVerticesForDifference(){
List<Vertex<Long,Long>> vertices = new ArrayList<Vertex<Long,Long>>();
vertices.add(new Vertex<Long, Long>(1L, 1L));
vertices.add(new Vertex<Long, Long>(3L, 3L));
vertices.add(new Vertex<Long, Long>(6L, 6L));
public static List<Vertex<Long,Long>> getVerticesForDifference(){
List<Vertex<Long,Long>> vertices = new ArrayList<>();
vertices.add(new Vertex<>(1L, 1L));
vertices.add(new Vertex<>(3L, 3L));
vertices.add(new Vertex<>(6L, 6L));
return vertices;
}
public static final List<Edge<Long, Long>> getLongLongEdgesForDifference() {
List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
edges.add(new Edge<Long, Long>(1L, 3L, 13L));
edges.add(new Edge<Long, Long>(1L, 6L, 26L));
edges.add(new Edge<Long, Long>(6L, 3L, 63L));
public static List<Edge<Long, Long>> getLongLongEdgesForDifference() {
List<Edge<Long, Long>> edges = new ArrayList<>();
edges.add(new Edge<>(1L, 3L, 13L));
edges.add(new Edge<>(1L, 6L, 26L));
edges.add(new Edge<>(6L, 3L, 63L));
return edges;
}
public static final List<Edge<Long, Long>> getLongLongEdgesForDifference2() {
List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
edges.add(new Edge<Long, Long>(6L, 6L, 66L));
public static List<Edge<Long, Long>> getLongLongEdgesForDifference2() {
List<Edge<Long, Long>> edges = new ArrayList<>();
edges.add(new Edge<>(6L, 6L, 66L));
return edges;
}
}
\ No newline at end of file
......@@ -75,7 +75,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
new UpdateFunction(), new MessageFunction(), 10, parameters);
DataSet<Vertex<Long,Long>> data = res.getVertices();
List<Vertex<Long,Long>> result= data.collect();
List<Vertex<Long,Long>> result= data.collect();
expectedResult = "1,11\n" +
"2,11\n" +
......@@ -137,7 +137,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
DataSet<Tuple2<Long, Long>> data = res.getVertices().map(new VertexToTuple2Map<Long, Long>());
List<Tuple2<Long, Long>> result= data.collect();
List<Tuple2<Long, Long>> result= data.collect();
expectedResult = "1,6\n" +
"2,6\n" +
......@@ -231,7 +231,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
.runVertexCentricIteration(new VertexUpdateDirection(), new IdMessengerAll(), 5, parameters)
.getVertices();
List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
List<Vertex<Long, HashSet<Long>>> result= resultedVertices.collect();
expectedResult = "1,[2, 3, 5]\n" +
"2,[1, 3]\n" +
......@@ -264,7 +264,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
.runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
.getVertices();
List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
expectedResult = "1,[2, 3]\n" +
"2,[3]\n" +
......@@ -297,7 +297,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
.runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
.getVertices();
List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
expectedResult = "1,[5]\n" +
"2,[1]\n" +
......@@ -330,7 +330,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
.runVertexCentricIteration(new VertexUpdateDirection(), new SendMsgToAll(), 5, parameters)
.getVertices();
List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
List<Vertex<Long, HashSet<Long>>> result = resultedVertices.collect();
expectedResult = "1,[2, 3, 5]\n" +
"2,[1, 3]\n" +
......@@ -356,7 +356,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
new DummyMessageFunction(), 2).getVertices();
List<Vertex<Long, Long>> result= verticesWithNumVertices.collect();
List<Vertex<Long, Long>> result= verticesWithNumVertices.collect();
expectedResult = "1,-1\n" +
"2,-1\n" +
......@@ -388,7 +388,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
new UpdateFunctionInDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
expectedResult = "1,1\n" +
"2,1\n" +
......@@ -413,7 +413,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
expectedResult = "1,-1\n" +
"2,-1\n" +
......@@ -445,7 +445,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
new UpdateFunctionOutDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
expectedResult = "1,2\n" +
"2,1\n" +
......@@ -470,7 +470,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
List<Vertex<Long, Long>> result= verticesWithDegrees.collect();
expectedResult = "1,-1\n" +
"2,-1\n" +
......@@ -502,7 +502,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
DataSet<Vertex<Long, Boolean>> verticesWithNumNeighbors = graph.runVertexCentricIteration(
new VertexUpdateNumNeighbors(), new IdMessenger(), 1, parameters).getVertices();
List<Vertex<Long, Boolean>> result= verticesWithNumNeighbors.collect();
List<Vertex<Long, Boolean>> result= verticesWithNumNeighbors.collect();
expectedResult = "1,true\n" +
"2,true\n" +
......@@ -548,8 +548,6 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
@SuppressWarnings("serial")
public static final class UpdateFunctionDefault extends VertexUpdateFunction<Long, Long, Long> {
LongSumAggregator aggregator = new LongSumAggregator();
@Override
public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
......@@ -643,11 +641,11 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
@Override
public void sendMessages(Vertex<Long, Long> vertex) {
if (vertex.getId().equals(1)) {
if (vertex.getId() == 1) {
Assert.assertEquals(2, getOutDegree());
Assert.assertEquals(1, getInDegree());
}
else if(vertex.getId().equals(3)) {
else if(vertex.getId() == 3) {
Assert.assertEquals(2, getOutDegree());
Assert.assertEquals(2, getInDegree());
}
......@@ -735,7 +733,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
@Override
public void sendMessages(Vertex<Long, HashSet<Long>> vertex) throws Exception {
for (Edge<Long, Long> edge : getEdges()) {
if(edge.getSource() != vertex.getId()) {
if(!edge.getSource().equals(vertex.getId())) {
sendMessageTo(edge.getSource(), vertex.getId());
} else {
sendMessageTo(edge.getTarget(), vertex.getId());
......@@ -759,7 +757,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
@Override
public void sendMessages(Vertex<Long, Boolean> vertex) throws Exception {
for (Edge<Long, Long> edge : getEdges()) {
if(edge.getSource() != vertex.getId()) {
if(!edge.getSource().equals(vertex.getId())) {
sendMessageTo(edge.getSource(), vertex.getId());
} else {
sendMessageTo(edge.getTarget(), vertex.getId());
......@@ -783,7 +781,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
public Long map(Vertex<Long, Long> value) {
return 1l;
return 1L;
}
}
......@@ -792,7 +790,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
@Override
public HashSet<Long> map(Vertex<Long, Long> value) throws Exception {
return new HashSet<Long>();
return new HashSet<>();
}
}
}
......@@ -92,7 +92,7 @@ public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
DataSet<Edge<Long, Double>> edges = IncrementalSSSPData.getDefaultEdgeDataSet(env);
DataSet<Edge<Long, Double>> edgesInSSSP = IncrementalSSSPData.getDefaultEdgesInSSSP(env);
// the edge to be removed is a non-SP edge
Edge<Long, Double> edgeToBeRemoved = new Edge<Long, Double>(3L, 5L, 5.0);
Edge<Long, Double> edgeToBeRemoved = new Edge<>(3L, 5L, 5.0);
Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
// Assumption: all minimum weight paths are kept
......
......@@ -82,7 +82,7 @@ public class MusicProfilesITCase extends MultipleProgramsTestBase {
public void after() throws Exception {
compareResultsByLinesInMemory(expectedTopSongs, topSongsResultPath);
ArrayList<String> list = new ArrayList<String>();
ArrayList<String> list = new ArrayList<>();
readAllResultLines(list, communitiesResultPath, new String[]{}, false);
String[] result = list.toArray(new String[list.size()]);
......
......@@ -73,7 +73,7 @@ public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTes
public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> {
@Override
public Vertex<Long, Long> map(Long value) {
return new Vertex<Long, Long>(value, value);
return new Vertex<>(value, value);
}
}
......@@ -87,7 +87,7 @@ public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTes
public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> {
public Edge<Long, NullValue> map(String value) {
String[] nums = value.split(" ");
return new Edge<Long, NullValue>(Long.parseLong(nums[0]), Long.parseLong(nums[1]),
return new Edge<>(Long.parseLong(nums[0]), Long.parseLong(nums[1]),
NullValue.getInstance());
}
}
......
......@@ -41,8 +41,6 @@ public class PageRankITCase extends MultipleProgramsTestBase {
super(mode);
}
private String expectedResult;
@Test
public void testPageRankWithThreeIterations() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
......@@ -53,7 +51,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
.collect();
compareWithDelta(result, expectedResult, 0.01);
compareWithDelta(result, 0.01);
}
@Test
......@@ -66,7 +64,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
.collect();
compareWithDelta(result, expectedResult, 0.01);
compareWithDelta(result, 0.01);
}
@Test
......@@ -79,7 +77,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3))
.collect();
compareWithDelta(result, expectedResult, 0.01);
compareWithDelta(result, 0.01);
}
@Test
......@@ -92,18 +90,18 @@ public class PageRankITCase extends MultipleProgramsTestBase {
List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3))
.collect();
compareWithDelta(result, expectedResult, 0.01);
compareWithDelta(result, 0.01);
}
private void compareWithDelta(List<Vertex<Long, Double>> result,
String expectedResult, double delta) {
double delta) {
String resultString = "";
for (Vertex<Long, Double> v : result) {
resultString += v.f0.toString() + "," + v.f1.toString() +"\n";
}
expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
String expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n");
String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n");
......
......@@ -34,8 +34,6 @@ import java.util.List;
@RunWith(Parameterized.class)
public class TriangleCountITCase extends MultipleProgramsTestBase {
private String expectedResult;
public TriangleCountITCase(TestExecutionMode mode) {
super(mode);
}
......@@ -49,7 +47,7 @@ public class TriangleCountITCase extends MultipleProgramsTestBase {
env).getUndirected();
List<Integer> numberOfTriangles = graph.run(new GSATriangleCount<Long, NullValue, NullValue>()).collect();
expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
String expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult));
}
......
......@@ -124,7 +124,7 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
//env.fromElements(result).writeAsText(resultPath);
String res= valid.toString();//env.fromElements(valid);
List<String> result= new LinkedList<String>();
List<String> result= new LinkedList<>();
result.add(res);
expectedResult = "true";
......@@ -144,7 +144,7 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
Boolean valid = graph.validate(new InvalidVertexIdsValidator<Long, Long, Long>());
String res= valid.toString();//env.fromElements(valid);
List<String> result= new LinkedList<String>();
List<String> result= new LinkedList<>();
result.add(res);
expectedResult = "false\n";
......@@ -216,8 +216,7 @@ public class GraphCreationITCase extends MultipleProgramsTestBase {
private static final class AssignCustomVertexValueMapper implements
MapFunction<Long, DummyCustomParameterizedType<Double>> {
DummyCustomParameterizedType<Double> dummyValue =
new DummyCustomParameterizedType<Double>();
DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<>();
public DummyCustomParameterizedType<Double> map(Long vertexId) {
dummyValue.setIntField(vertexId.intValue()-1);
......
......@@ -138,7 +138,7 @@ public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase {
@SuppressWarnings("serial")
private static final class AssignTuple2ValueMapper implements MapFunction<Long, Tuple2<Long, Long>> {
public Tuple2<Long, Long> map(Long vertexId) {
return new Tuple2<Long, Long>(vertexId*2, 42l);
return new Tuple2<>(vertexId*2, 42L);
}
}
......
......@@ -39,8 +39,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
super(mode);
}
private String expectedResult;
private String expectedResult;
@Test
public void testAddVertex() throws Exception {
......@@ -53,10 +52,10 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
graph = graph.addVertex(new Vertex<Long, Long>(6L, 6L));
graph = graph.addVertex(new Vertex<>(6L, 6L));
DataSet<Vertex<Long,Long>> data = graph.getVertices();
List<Vertex<Long,Long>> result= data.collect();
List<Vertex<Long,Long>> result = data.collect();
expectedResult = "1,1\n" +
"2,2\n" +
......@@ -64,7 +63,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"4,4\n" +
"5,5\n" +
"6,6\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -79,14 +78,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
vertices.add(new Vertex<Long, Long>(6L, 6L));
vertices.add(new Vertex<Long, Long>(7L, 7L));
List<Vertex<Long, Long>> vertices = new ArrayList<>();
vertices.add(new Vertex<>(6L, 6L));
vertices.add(new Vertex<>(7L, 7L));
graph = graph.addVertices(vertices);
DataSet<Vertex<Long,Long>> data = graph.getVertices();
List<Vertex<Long,Long>> result= data.collect();
List<Vertex<Long,Long>> result= data.collect();
expectedResult = "1,1\n" +
"2,2\n" +
......@@ -95,7 +94,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"5,5\n" +
"6,6\n" +
"7,7\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -109,17 +108,17 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
graph = graph.addVertex(new Vertex<Long, Long>(1L, 1L));
graph = graph.addVertex(new Vertex<>(1L, 1L));
DataSet<Vertex<Long,Long>> data = graph.getVertices();
List<Vertex<Long,Long>> result= data.collect();
List<Vertex<Long,Long>> result= data.collect();
expectedResult = "1,1\n" +
"2,2\n" +
"3,3\n" +
"4,4\n" +
"5,5\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -134,21 +133,21 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
vertices.add(new Vertex<Long, Long>(1L, 1L));
vertices.add(new Vertex<Long, Long>(3L, 3L));
List<Vertex<Long, Long>> vertices = new ArrayList<>();
vertices.add(new Vertex<>(1L, 1L));
vertices.add(new Vertex<>(3L, 3L));
graph = graph.addVertices(vertices);
DataSet<Vertex<Long,Long>> data = graph.getVertices();
List<Vertex<Long,Long>> result= data.collect();
List<Vertex<Long,Long>> result= data.collect();
expectedResult = "1,1\n" +
"2,2\n" +
"3,3\n" +
"4,4\n" +
"5,5\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -163,14 +162,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
vertices.add(new Vertex<Long, Long>(1L, 1L));
vertices.add(new Vertex<Long, Long>(6L, 6L));
List<Vertex<Long, Long>> vertices = new ArrayList<>();
vertices.add(new Vertex<>(1L, 1L));
vertices.add(new Vertex<>(6L, 6L));
graph = graph.addVertices(vertices);
DataSet<Vertex<Long,Long>> data = graph.getVertices();
List<Vertex<Long,Long>> result= data.collect();
List<Vertex<Long,Long>> result= data.collect();
expectedResult = "1,1\n" +
"2,2\n" +
......@@ -178,7 +177,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"4,4\n" +
"5,5\n" +
"6,6\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -192,16 +191,16 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
graph = graph.removeVertex(new Vertex<Long, Long>(5L, 5L));
graph = graph.removeVertex(new Vertex<>(5L, 5L));
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
"2,3,23\n" +
"3,4,34\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -216,19 +215,19 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<Vertex<Long, Long>>();
verticesToBeRemoved.add(new Vertex<Long, Long>(1L, 1L));
verticesToBeRemoved.add(new Vertex<Long, Long>(2L, 2L));
List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<>();
verticesToBeRemoved.add(new Vertex<>(1L, 1L));
verticesToBeRemoved.add(new Vertex<>(2L, 2L));
graph = graph.removeVertices(verticesToBeRemoved);
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "3,4,34\n" +
"3,5,35\n" +
"4,5,45\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -236,16 +235,16 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
public void testRemoveInvalidVertex() throws Exception {
/*
* Test removeVertex() -- remove an invalid vertex
*/
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
graph = graph.removeVertex(new Vertex<Long, Long>(6L, 6L));
graph = graph.removeVertex(new Vertex<>(6L, 6L));
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
......@@ -254,7 +253,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"3,5,35\n" +
"4,5,45\n" +
"5,1,51\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -268,20 +267,20 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<Vertex<Long, Long>>();
verticesToBeRemoved.add(new Vertex<Long, Long>(1L, 1L));
verticesToBeRemoved.add(new Vertex<Long, Long>(7L, 7L));
List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<>();
verticesToBeRemoved.add(new Vertex<>(1L, 1L));
verticesToBeRemoved.add(new Vertex<>(7L, 7L));
graph = graph.removeVertices(verticesToBeRemoved);
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "2,3,23\n" +
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -295,14 +294,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<Vertex<Long, Long>>();
verticesToBeRemoved.add(new Vertex<Long, Long>(6L, 6L));
verticesToBeRemoved.add(new Vertex<Long, Long>(7L, 7L));
List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<>();
verticesToBeRemoved.add(new Vertex<>(6L, 6L));
verticesToBeRemoved.add(new Vertex<>(7L, 7L));
graph = graph.removeVertices(verticesToBeRemoved);
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
......@@ -311,7 +310,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"3,5,35\n" +
"4,5,45\n" +
"5,1,51\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -325,39 +324,38 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<Vertex<Long, Long>>();
verticesToBeRemoved.add(new Vertex<Long, Long>(6L, 6L));
verticesToBeRemoved.add(new Vertex<Long, Long>(7L, 7L));
List<Vertex<Long, Long>> verticesToBeRemoved = new ArrayList<>();
verticesToBeRemoved.add(new Vertex<>(6L, 6L));
verticesToBeRemoved.add(new Vertex<>(7L, 7L));
graph = graph.removeVertices(verticesToBeRemoved);
DataSet<Vertex<Long,Long>> data = graph.getVertices();
List<Vertex<Long, Long>> result= data.collect();
DataSet<Vertex<Long,Long>> data = graph.getVertices();
List<Vertex<Long, Long>> result= data.collect();
expectedResult = "1,1\n" +
"2,2\n" +
"3,3\n" +
"4,4\n" +
"5,5\n";
compareResultAsTuples(result, expectedResult);
}
@Test
public void testAddEdge() throws Exception {
/*
* Test addEdge() -- simple case
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
graph = graph.addEdge(new Vertex<Long, Long>(6L, 6L), new Vertex<Long, Long>(1L, 1L),
61L);
graph = graph.addEdge(new Vertex<>(6L, 6L), new Vertex<>(1L, 1L), 61L);
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
......@@ -366,8 +364,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"3,5,35\n" +
"4,5,45\n" +
"5,1,51\n" +
"6,1,61\n";
"6,1,61\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -382,14 +380,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Edge<Long, Long>> edgesToBeAdded = new ArrayList<Edge<Long, Long>>();
edgesToBeAdded.add(new Edge<Long, Long>(2L, 4L, 24L));
edgesToBeAdded.add(new Edge<Long, Long>(4L, 1L, 41L));
List<Edge<Long, Long>> edgesToBeAdded = new ArrayList<>();
edgesToBeAdded.add(new Edge<>(2L, 4L, 24L));
edgesToBeAdded.add(new Edge<>(4L, 1L, 41L));
graph = graph.addEdges(edgesToBeAdded);
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
......@@ -400,7 +398,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"4,1,41\n" +
"4,5,45\n" +
"5,1,51\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -415,14 +413,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Edge<Long, Long>> edgesToBeAdded = new ArrayList<Edge<Long, Long>>();
edgesToBeAdded.add(new Edge<Long, Long>(6L, 1L, 61L));
edgesToBeAdded.add(new Edge<Long, Long>(7L, 1L, 71L));
List<Edge<Long, Long>> edgesToBeAdded = new ArrayList<>();
edgesToBeAdded.add(new Edge<>(6L, 1L, 61L));
edgesToBeAdded.add(new Edge<>(7L, 1L, 71L));
graph = graph.addEdges(edgesToBeAdded);
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
......@@ -431,7 +429,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"3,5,35\n" +
"4,5,45\n" +
"5,1,51\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -445,11 +443,11 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
graph = graph.addEdge(new Vertex<Long, Long>(1L, 1L), new Vertex<Long, Long>(2L, 2L),
graph = graph.addEdge(new Vertex<>(1L, 1L), new Vertex<>(2L, 2L),
12L);
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "1,2,12\n" +
"1,2,12\n" +
......@@ -458,8 +456,8 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n" +
"5,1,51\n";
"5,1,51\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -473,10 +471,10 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
graph = graph.removeEdge(new Edge<Long, Long>(5L, 1L, 51L));
graph = graph.removeEdge(new Edge<>(5L, 1L, 51L));
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
......@@ -484,7 +482,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -498,21 +496,21 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<Edge<Long, Long>>();
edgesToBeRemoved.add(new Edge<Long, Long>(5L, 1L, 51L));
edgesToBeRemoved.add(new Edge<Long, Long>(2L, 3L, 23L));
List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<>();
edgesToBeRemoved.add(new Edge<>(5L, 1L, 51L));
edgesToBeRemoved.add(new Edge<>(2L, 3L, 23L));
graph = graph.removeEdges(edgesToBeRemoved);
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -526,14 +524,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<Edge<Long, Long>>();
edgesToBeRemoved.add(new Edge<Long, Long>(5L, 1L, 51L));
edgesToBeRemoved.add(new Edge<Long, Long>(5L, 1L, 51L));
List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<>();
edgesToBeRemoved.add(new Edge<>(5L, 1L, 51L));
edgesToBeRemoved.add(new Edge<>(5L, 1L, 51L));
graph = graph.removeEdges(edgesToBeRemoved);
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
......@@ -541,7 +539,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"3,4,34\n" +
"3,5,35\n" +
"4,5,45\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -552,13 +550,13 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
graph = graph.removeEdge(new Edge<Long, Long>(6L, 1L, 61L));
graph = graph.removeEdge(new Edge<>(6L, 1L, 61L));
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
......@@ -567,7 +565,7 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
"3,5,35\n" +
"4,5,45\n" +
"5,1,51\n";
compareResultAsTuples(result, expectedResult);
}
......@@ -581,14 +579,14 @@ public class GraphMutationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<Edge<Long, Long>>();
edgesToBeRemoved.add(new Edge<Long, Long>(1L, 1L, 51L));
edgesToBeRemoved.add(new Edge<Long, Long>(6L, 1L, 61L));
List<Edge<Long, Long>> edgesToBeRemoved = new ArrayList<>();
edgesToBeRemoved.add(new Edge<>(1L, 1L, 51L));
edgesToBeRemoved.add(new Edge<>(6L, 1L, 61L));
graph = graph.removeEdges(edgesToBeRemoved);
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
DataSet<Edge<Long,Long>> data = graph.getEdges();
List<Edge<Long, Long>> result= data.collect();
expectedResult = "1,2,12\n" +
"1,3,13\n" +
......
......@@ -263,11 +263,11 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
List<Edge<Long, Long>> edges = new ArrayList<Edge<Long, Long>>();
List<Vertex<Long, Long>> vertices = new ArrayList<>();
List<Edge<Long, Long>> edges = new ArrayList<>();
vertices.add(new Vertex<Long, Long>(6L, 6L));
edges.add(new Edge<Long, Long>(6L, 1L, 61L));
vertices.add(new Vertex<>(6L, 6L));
edges.add(new Edge<>(6L, 1L, 61L));
graph = graph.union(Graph.fromCollection(vertices, edges, env));
......@@ -339,7 +339,7 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
TestGraphUtils.getLongLongEdgeData(env), env);
DataSet<Vertex<Long, Long>> vertex = env.fromElements(new Vertex<Long, Long>(6L, 6L));
DataSet<Vertex<Long, Long>> vertex = env.fromElements(new Vertex<>(6L, 6L));
Graph<Long, Long, Long> graph2 = Graph.fromDataSet(vertex,TestGraphUtils.getLongLongEdgeDataDifference2(env),env);
......
......@@ -473,8 +473,7 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
@SuppressWarnings("serial")
private static final class BooleanEdgeValueMapper implements MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> {
public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
return new Tuple3<Long, Long, Boolean>(edge.getSource(),
edge.getTarget(), true);
return new Tuple3<>(edge.getSource(), edge.getTarget(), true);
}
}
......@@ -512,28 +511,28 @@ public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
@SuppressWarnings("serial")
private static final class ProjectSourceAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
return new Tuple2<Long, Long>(edge.getSource(), edge.getValue());
return new Tuple2<>(edge.getSource(), edge.getValue());
}
}
@SuppressWarnings("serial")
private static final class ProjectSourceWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
return new Tuple2<Long, Boolean>(edge.getSource(), true);
return new Tuple2<>(edge.getSource(), true);
}
}
@SuppressWarnings("serial")
private static final class ProjectTargetAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue());
return new Tuple2<>(edge.getTarget(), edge.getValue());
}
}
@SuppressWarnings("serial")
private static final class ProjectTargetWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
return new Tuple2<Long, Boolean>(edge.getTarget(), true);
return new Tuple2<>(edge.getTarget(), true);
}
}
}
\ No newline at end of file
......@@ -184,7 +184,7 @@ public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
@SuppressWarnings("serial")
private static final class ProjectIdWithTrue implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> {
public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception {
return new Tuple2<Long, Boolean>(vertex.getId(), true);
return new Tuple2<>(vertex.getId(), true);
}
}
......
......@@ -181,7 +181,7 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
@SuppressWarnings("serial")
private static final class ToTuple1Mapper implements MapFunction<Edge<Long, Long>, Tuple1<Long>> {
public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception {
Tuple1<Long> tupleValue = new Tuple1<Long>();
Tuple1<Long> tupleValue = new Tuple1<>();
tupleValue.setFields(edge.getValue());
return tupleValue;
}
......@@ -201,7 +201,7 @@ public class MapEdgesITCase extends MultipleProgramsTestBase {
DummyCustomParameterizedType<Double>> {
public DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws Exception {
DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<>();
dummyValue.setIntField(edge.getValue().intValue());
dummyValue.setTField(new Double(edge.getValue()));
return dummyValue;
......
......@@ -190,7 +190,7 @@ public class MapVerticesITCase extends MultipleProgramsTestBase {
@SuppressWarnings("serial")
private static final class ToTuple1Mapper implements MapFunction<Vertex<Long, Long>, Tuple1<Long>> {
public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception {
Tuple1<Long> tupleValue = new Tuple1<Long>();
Tuple1<Long> tupleValue = new Tuple1<>();
tupleValue.setFields(vertex.getValue());
return tupleValue;
}
......@@ -210,7 +210,7 @@ public class MapVerticesITCase extends MultipleProgramsTestBase {
DummyCustomParameterizedType<Double>> {
public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception {
DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<>();
dummyValue.setIntField(vertex.getValue().intValue());
dummyValue.setTField(new Double(vertex.getValue()));
return dummyValue;
......
......@@ -19,6 +19,7 @@
package org.apache.flink.graph.test.operations;
import java.util.List;
import java.util.Objects;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
......@@ -414,7 +415,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
minNeighborId = edge.getTarget();
}
}
out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
out.collect(new Tuple2<>(v.getId(), minNeighborId));
}
}
......@@ -432,7 +433,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
weight = edge.getValue();
}
}
out.collect(new Tuple2<Long, Long>(v.getId(), weight));
out.collect(new Tuple2<>(v.getId(), weight));
}
}
......@@ -470,7 +471,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
minNeighborId = edge.getSource();
}
}
out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
out.collect(new Tuple2<>(v.getId(), minNeighborId));
}
}
......@@ -482,7 +483,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
Collector<Tuple2<Long, Long>> out) throws Exception {
for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget()));
}
}
}
......@@ -496,7 +497,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
if(edge.f0 != 5) {
out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget()));
}
}
}
......@@ -511,7 +512,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
Collector<Tuple2<Long, Long>> out) throws Exception {
for (Edge<Long, Long> edge: edges) {
if(v.getValue() > 2) {
out.collect(new Tuple2<Long, Long>(v.getId(), edge.getTarget()));
out.collect(new Tuple2<>(v.getId(), edge.getTarget()));
}
}
}
......@@ -525,7 +526,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
Collector<Tuple2<Long, Long>> out) throws Exception {
for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
out.collect(new Tuple2<>(edge.f0, edge.f1.getSource()));
}
}
}
......@@ -539,7 +540,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
if(edge.f0 != 5) {
out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
out.collect(new Tuple2<>(edge.f0, edge.f1.getSource()));
}
}
}
......@@ -554,7 +555,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
Collector<Tuple2<Long, Long>> out) throws Exception {
for (Edge<Long, Long> edge: edges) {
if(v.getValue() > 2) {
out.collect(new Tuple2<Long, Long>(v.getId(), edge.getSource()));
out.collect(new Tuple2<>(v.getId(), edge.getSource()));
}
}
}
......@@ -567,10 +568,10 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
Collector<Tuple2<Long, Long>> out) throws Exception {
for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
if (edge.f0 == edge.f1.getTarget()) {
out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
if (Objects.equals(edge.f0, edge.f1.getTarget())) {
out.collect(new Tuple2<>(edge.f0, edge.f1.getSource()));
} else {
out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget()));
}
}
}
......@@ -584,10 +585,10 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
Collector<Tuple2<Long, Long>> out) throws Exception {
for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
if(edge.f0 != 5 && edge.f0 != 2) {
if (edge.f0 == edge.f1.getTarget()) {
out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
if (Objects.equals(edge.f0, edge.f1.getTarget())) {
out.collect(new Tuple2<>(edge.f0, edge.f1.getSource()));
} else {
out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
out.collect(new Tuple2<>(edge.f0, edge.f1.getTarget()));
}
}
}
......@@ -604,9 +605,9 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
for(Edge<Long, Long> edge : edges) {
if(v.getValue() > 4) {
if(v.getId().equals(edge.getTarget())) {
out.collect(new Tuple2<Long, Long>(v.getId(), edge.getSource()));
out.collect(new Tuple2<>(v.getId(), edge.getSource()));
} else {
out.collect(new Tuple2<Long, Long>(v.getId(), edge.getTarget()));
out.collect(new Tuple2<>(v.getId(), edge.getTarget()));
}
}
}
......
......@@ -131,9 +131,9 @@ public class ReduceOnEdgesWithExceptionITCase {
for(Edge<Long, Long> edge : edges) {
if(v.getValue() > 4) {
if(v.getId().equals(edge.getTarget())) {
out.collect(new Tuple2<Long, Long>(v.getId(), edge.getSource()));
out.collect(new Tuple2<>(v.getId(), edge.getSource()));
} else {
out.collect(new Tuple2<Long, Long>(v.getId(), edge.getTarget()));
out.collect(new Tuple2<>(v.getId(), edge.getTarget()));
}
}
}
......
......@@ -420,7 +420,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f1.getValue();
}
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
out.collect(new Tuple2<>(vertex.getId(), sum));
}
}
......@@ -437,7 +437,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f0.getValue() * neighbor.f1.getValue();
}
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
out.collect(new Tuple2<>(vertex.getId(), sum));
}
}
......@@ -454,7 +454,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f1.getValue();
}
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue()));
}
}
......@@ -472,7 +472,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
sum += neighbor.f1.getValue();
}
if(vertex.getId() > 3) {
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
out.collect(new Tuple2<>(vertex.getId(), sum));
}
}
}
......@@ -491,7 +491,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
sum += neighbor.f0.getValue() * neighbor.f1.getValue();
}
if(vertex.getId() > 3) {
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
out.collect(new Tuple2<>(vertex.getId(), sum));
}
}
}
......@@ -510,7 +510,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
sum += neighbor.f1.getValue();
}
if(vertex.getId() > 3) {
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue()));
}
}
}
......@@ -533,13 +533,11 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
Collector<Tuple2<Long, Long>> out) throws Exception {
long sum = 0;
Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
neighbors.iterator();
while(neighborsIterator.hasNext()) {
next = neighborsIterator.next();
for (Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
next = neighbor;
sum += next.f2.getValue() * next.f1.getValue();
}
out.collect(new Tuple2<Long, Long>(next.f0, sum));
out.collect(new Tuple2<>(next.f0, sum));
}
}
......@@ -553,15 +551,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
long sum = 0;
Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
neighbors.iterator();
while(neighborsIterator.hasNext()) {
next = neighborsIterator.next();
for (Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
next = neighbor;
sum += next.f2.getValue();
}
if(next.f0 > 2) {
out.collect(new Tuple2<Long, Long>(next.f0, sum));
out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
out.collect(new Tuple2<>(next.f0, sum));
out.collect(new Tuple2<>(next.f0, sum * 2));
}
}
}
......@@ -576,15 +572,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
long sum = 0;
Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
neighbors.iterator();
while(neighborsIterator.hasNext()) {
next = neighborsIterator.next();
for (Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
next = neighbor;
sum += next.f2.getValue() * next.f1.getValue();
}
if(next.f0 > 2) {
out.collect(new Tuple2<Long, Long>(next.f0, sum));
out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
out.collect(new Tuple2<>(next.f0, sum));
out.collect(new Tuple2<>(next.f0, sum * 2));
}
}
}
......@@ -599,15 +593,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
long sum = 0;
Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
neighbors.iterator();
while(neighborsIterator.hasNext()) {
next = neighborsIterator.next();
for (Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
next = neighbor;
sum += next.f2.getValue();
}
if(next.f0 > 2) {
out.collect(new Tuple2<Long, Long>(next.f0, sum));
out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
out.collect(new Tuple2<>(next.f0, sum));
out.collect(new Tuple2<>(next.f0, sum * 2));
}
}
}
......@@ -625,8 +617,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f1.getValue();
}
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum * 2));
out.collect(new Tuple2<>(vertex.getId(), sum));
out.collect(new Tuple2<>(vertex.getId(), sum * 2));
}
}
......@@ -643,8 +635,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f0.getValue() * neighbor.f1.getValue();
}
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum - 1));
out.collect(new Tuple2<>(vertex.getId(), sum));
out.collect(new Tuple2<>(vertex.getId(), sum - 1));
}
}
......@@ -661,8 +653,8 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f1.getValue();
}
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue() + 5));
out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue()));
out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue() + 5));
}
}
}
\ No newline at end of file
......@@ -188,7 +188,7 @@ public class ReduceOnNeighborsWithExceptionITCase {
for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
sum += neighbor.f1.getValue();
}
out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
out.collect(new Tuple2<>(vertex.getId(), sum + vertex.getValue()));
}
}
......
......@@ -162,8 +162,8 @@ public class TestBaseUtils extends TestLogger {
if (executor.running()) {
List<ActorRef> tms = executor.getTaskManagersAsJava();
List<Future<Object>> bcVariableManagerResponseFutures = new ArrayList<Future<Object>>();
List<Future<Object>> numActiveConnectionsResponseFutures = new ArrayList<Future<Object>>();
List<Future<Object>> bcVariableManagerResponseFutures = new ArrayList<>();
List<Future<Object>> numActiveConnectionsResponseFutures = new ArrayList<>();
for (ActorRef tm : tms) {
bcVariableManagerResponseFutures.add(Patterns.ask(tm, TestingTaskManagerMessages
......@@ -297,7 +297,7 @@ public class TestBaseUtils extends TestLogger {
public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath,
String[] excludePrefixes) throws Exception {
ArrayList<String> list = new ArrayList<String>();
ArrayList<String> list = new ArrayList<>();
readAllResultLines(list, resultPath, excludePrefixes, false);
String[] result = list.toArray(new String[list.size()]);
......@@ -317,7 +317,7 @@ public class TestBaseUtils extends TestLogger {
public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr,
String resultPath, String[] excludePrefixes) throws Exception {
ArrayList<String> list = new ArrayList<String>();
ArrayList<String> list = new ArrayList<>();
readAllResultLines(list, resultPath, excludePrefixes, true);
String[] result = list.toArray(new String[list.size()]);
......@@ -332,7 +332,7 @@ public class TestBaseUtils extends TestLogger {
Pattern pattern = Pattern.compile(regexp);
Matcher matcher = pattern.matcher("");
ArrayList<String> list = new ArrayList<String>();
ArrayList<String> list = new ArrayList<>();
try {
readAllResultLines(list, resultPath, new String[]{}, false);
} catch (IOException e1) {
......@@ -355,7 +355,7 @@ public class TestBaseUtils extends TestLogger {
public static void compareKeyValuePairsWithDelta(String expectedLines, String resultPath,
String[] excludePrefixes, String delimiter, double maxDelta) throws Exception {
ArrayList<String> list = new ArrayList<String>();
ArrayList<String> list = new ArrayList<>();
readAllResultLines(list, resultPath, excludePrefixes, false);
String[] result = list.toArray(new String[list.size()]);
......@@ -421,9 +421,7 @@ public class TestBaseUtils extends TestLogger {
} else {
throw new IllegalArgumentException("This path does not denote a local file.");
}
} catch (URISyntaxException e) {
throw new IllegalArgumentException("This path does not describe a valid local file URI.");
} catch (NullPointerException e) {
} catch (URISyntaxException | NullPointerException e) {
throw new IllegalArgumentException("This path does not describe a valid local file URI.");
}
}
......@@ -441,7 +439,7 @@ public class TestBaseUtils extends TestLogger {
}
private static <T> void compareResult(List<T> result, String expected, boolean asTuples) {
String[] extectedStrings = expected.split("\n");
String[] expectedStrings = expected.split("\n");
String[] resultStrings = new String[result.size()];
for (int i = 0; i < resultStrings.length; i++) {
......@@ -467,13 +465,13 @@ public class TestBaseUtils extends TestLogger {
}
}
assertEquals("Wrong number of elements result", extectedStrings.length, resultStrings.length);
assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length);
Arrays.sort(extectedStrings);
Arrays.sort(expectedStrings);
Arrays.sort(resultStrings);
for (int i = 0; i < extectedStrings.length; i++) {
assertEquals(extectedStrings[i], resultStrings[i]);
for (int i = 0; i < expectedStrings.length; i++) {
assertEquals(expectedStrings[i], resultStrings[i]);
}
}
......@@ -510,7 +508,7 @@ public class TestBaseUtils extends TestLogger {
// --------------------------------------------------------------------------------------------
protected static Collection<Object[]> toParameterList(Configuration ... testConfigs) {
ArrayList<Object[]> configs = new ArrayList<Object[]>();
ArrayList<Object[]> configs = new ArrayList<>();
for (Configuration testConfig : testConfigs) {
Object[] c = { testConfig };
configs.add(c);
......@@ -519,7 +517,7 @@ public class TestBaseUtils extends TestLogger {
}
protected static Collection<Object[]> toParameterList(List<Configuration> testConfigs) {
LinkedList<Object[]> configs = new LinkedList<Object[]>();
LinkedList<Object[]> configs = new LinkedList<>();
for (Configuration testConfig : testConfigs) {
Object[] c = { testConfig };
configs.add(c);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册