diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java index 281439e736d7fba95fc8fc4a469b8625a9495cb6..0745d73698d95030b09aba62e9c7efa405c89404 100644 --- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java +++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/TransitiveClosureNaive.java @@ -19,6 +19,7 @@ package org.apache.flink.example.java.graph; +import org.apache.flink.api.common.functions.CoGroupFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.java.DataSet; @@ -30,6 +31,9 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.example.java.graph.util.ConnectedComponentsData; import org.apache.flink.util.Collector; +import java.util.HashSet; +import java.util.Set; + @SuppressWarnings("serial") public class TransitiveClosureNaive implements ProgramDescription { @@ -73,6 +77,24 @@ public class TransitiveClosureNaive implements ProgramDescription { } }); + DataSet> newPaths = paths + .coGroup(nextPaths) + .where(0).equalTo(0) + .with(new CoGroupFunction, Tuple2, Tuple2>() { + Set prevSet = new HashSet>(); + @Override + public void coGroup(Iterable> prevPaths, Iterable> nextPaths, Collector> out) throws Exception { + for (Tuple2 prev : prevPaths) { + prevSet.add(prev); + } + for (Tuple2 next: nextPaths) { + if (!prevSet.contains(next)) { + out.collect(next); + } + } + } + }); + DataSet> transitiveClosure = paths.closeWith(nextPaths); diff --git a/flink-examples/flink-scala-examples/pom.xml b/flink-examples/flink-scala-examples/pom.xml index a6801f87046ed3993b19524ca5351fbbc7695d73..3b50c8b715badda1538a6edd70c09bd5ccd055f5 100644 --- a/flink-examples/flink-scala-examples/pom.xml +++ b/flink-examples/flink-scala-examples/pom.xml @@ -281,7 +281,29 @@ under the License. --> - + + + TransitiveClosureNaive + package + + jar + + + + TransitiveClosureNaive + + + + org.apache.flink.examples.scala.graph.TransitiveClosureNaive + + + + + **/wordcount/TransitiveClosureNaive*.class + + + + diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala index 858ce30258852b1e427e26cd719824e43153290c..86d83db48da80988db50d8bac2d145f7a98a1f9d 100644 --- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala +++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/TransitiveClosureNaive.scala @@ -1,3 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.examples.scala.graph + +import org.apache.flink.api.scala._ +import org.apache.flink.example.java.graph.util.ConnectedComponentsData +import org.apache.flink.util.Collector + +object TransitiveClosureNaive { + + + def main (args: Array[String]): Unit = { + if (!parseParameters(args)) { + return + } + + val env = ExecutionEnvironment.getExecutionEnvironment + + val edges = getEdgesDataSet(env) + + val paths = edges.iterateWithTermination(maxIterations) { prevPaths: DataSet[(Long,Long)] => + + val nextPaths = prevPaths + .join(edges) + .where(1).equalTo(0) { + (left,right) => Some((left._1,right._2)) + } + .union(prevPaths) + .groupBy(0,1) + .reduce((l,r) => l) + + val terminate = prevPaths + .coGroup(nextPaths) + .where(0).equalTo(0) { + (prev, next, out: Collector[(Long, Long)]) => { + val prevPaths = prev.toList + for (n <- next) + if (!prevPaths.contains(n)) + out.collect(n) + } + } + (nextPaths, terminate) + } + + if (fileOutput) + paths.writeAsCsv(outputPath, "\n", " ") + else + paths.print() + + env.execute("Scala Transitive Closure Example") + + + } + + + private var fileOutput: Boolean = false + private var edgesPath: String = null + private var outputPath: String = null + private var maxIterations: Int = 10 + + private def parseParameters(programArguments: Array[String]): Boolean = { + if (programArguments.length > 0) { + fileOutput = true + if (programArguments.length == 3) { + edgesPath = programArguments(0) + outputPath = programArguments(1) + maxIterations = Integer.parseInt(programArguments(2)) + } + else { + System.err.println("Usage: TransitiveClosure ") + return false + } + } + else { + System.out.println("Executing TransitiveClosure example with default parameters and built-in default data.") + System.out.println(" Provide parameters to read input data from files.") + System.out.println(" See the documentation for the correct format of input files.") + System.out.println(" Usage: TransitiveClosure ") + } + return true + } + + private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[(Long, Long)] = { + if (fileOutput) { + env.readCsvFile[(Long, Long)]( + edgesPath, + fieldDelimiter = ' ', + includedFields = Array(0, 1)) + .map { x => (x._1, x._2)} + } + else { + val edgeData = ConnectedComponentsData.EDGES map { + case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long]) + } + env.fromCollection(edgeData) + } + } +} + + + + + + ///** // * Licensed to the Apache Software Foundation (ASF) under one // * or more contributor license agreements. See the NOTICE file diff --git a/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java new file mode 100644 index 0000000000000000000000000000000000000000..1bf25a6d65d85f5ef3eb755cd9719a0007867af2 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/exampleScalaPrograms/TransitiveClosureITCase.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.test.exampleScalaPrograms; + + +import java.io.BufferedReader; + +import org.apache.flink.examples.scala.graph.TransitiveClosureNaive; +import org.apache.flink.test.testdata.ConnectedComponentsData; +import org.apache.flink.test.testdata.TransitiveClosureData; +import org.apache.flink.test.util.JavaProgramTestBase; + +public class TransitiveClosureITCase extends JavaProgramTestBase { + + private static final long SEED = 0xBADC0FFEEBEEFL; + + private static final int NUM_VERTICES = 1000; + + private static final int NUM_EDGES = 10000; + + private String edgesPath; + private String resultPath; + + + @Override + protected void preSubmit() throws Exception { + edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED)); + resultPath = getTempFilePath("results"); + } + + @Override + protected void testProgram() throws Exception { + TransitiveClosureNaive.main(new String [] {edgesPath, resultPath, "5"}); + } + + @Override + protected void postSubmit() throws Exception { + for (BufferedReader reader : getResultReader(resultPath)) { + TransitiveClosureData.checkOddEvenResult(reader); + } + } +} +