提交 d0e1d635 编写于 作者: S Stephan Ewen 提交者: Robert Metzger

[FLINK-3195] [examples] Consolidate batch examples into one project, unify...

[FLINK-3195] [examples] Consolidate batch examples into one project, unify batch and streaming examples under on parent project
上级 62938c11
......@@ -46,47 +46,47 @@ The command line can be used to
- Run example program with no arguments.
./bin/flink run ./examples/WordCount.jar
./bin/flink run ./examples/batch/WordCount.jar
- Run example program with arguments for input and result files
./bin/flink run ./examples/WordCount.jar \
./bin/flink run ./examples/batch/WordCount.jar \
file:///home/user/hamlet.txt file:///home/user/wordcount_out
- Run example program with parallelism 16 and arguments for input and result files
./bin/flink run -p 16 ./examples/WordCount.jar \
./bin/flink run -p 16 ./examples/batch/WordCount.jar \
file:///home/user/hamlet.txt file:///home/user/wordcount_out
- Run example program with flink log output disabled
./bin/flink run -q ./examples/WordCount.jar
./bin/flink run -q ./examples/batch/WordCount.jar
- Run example program in detached mode
./bin/flink run -d ./examples/WordCount.jar
./bin/flink run -d ./examples/batch/WordCount.jar
- Run example program on a specific JobManager:
./bin/flink run -m myJMHost:6123 \
./examples/WordCount.jar \
./examples/batch/WordCount.jar \
file:///home/user/hamlet.txt file:///home/user/wordcount_out
- Run example program with a specific class as an entry point:
./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
./examples/WordCount.jar \
./examples/batch/WordCount.jar \
file:///home/user/hamlet.txt file:///home/user/wordcount_out
- Run example program using a [per-job YARN cluster]({{site.baseurl}}/setup/yarn_setup.html#run-a-single-flink-job-on-hadoop-yarn) with 2 TaskManagers:
./bin/flink run -m yarn-cluster -yn 2 \
./examples/WordCount.jar \
./examples/batch/WordCount.jar \
hdfs:///user/hamlet.txt hdfs:///user/wordcount_out
- Display the optimized execution plan for the WordCount example program as JSON:
./bin/flink info ./examples/WordCount.jar \
./bin/flink info ./examples/batch/WordCount.jar \
file:///home/user/hamlet.txt file:///home/user/wordcount_out
- List scheduled and running jobs (including their JobIDs):
......
......@@ -42,7 +42,7 @@ Each binary release of Flink contains an `examples` directory with jar files for
To run the WordCount example, issue the following command:
~~~bash
./bin/flink run ./examples/WordCount.jar
./bin/flink run ./examples/batch/WordCount.jar
~~~
The other examples can be started in a similar way.
......@@ -50,7 +50,7 @@ The other examples can be started in a similar way.
Note that many examples run without passing any arguments for them, by using build-in data. To run WordCount with real data, you have to pass the path to the data:
~~~bash
./bin/flink run ./examples/WordCount.jar /path/to/some/text/data /path/to/result
./bin/flink run ./examples/batch/WordCount.jar /path/to/some/text/data /path/to/result
~~~
Note that non-local file systems require a schema prefix, such as `hdfs://`.
......
......@@ -95,7 +95,7 @@ To bring up the Flink cluster on Google Compute Engine, execute:
./bdutil shell
cd /home/hadoop/flink-install/bin
./flink run ../examples/WordCount.jar gs://dataflow-samples/shakespeare/othello.txt gs://<bucket_name>/output
./flink run ../examples/batch/WordCount.jar gs://dataflow-samples/shakespeare/othello.txt gs://<bucket_name>/output
## Shut down your cluster
......
......@@ -48,7 +48,7 @@ Once the session has been started, you can submit jobs to the cluster using the
curl -O <flink_hadoop2_download_url>
tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
cd flink-{{ site.version }}/
./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/WordCount.jar
./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar
~~~
## Apache Flink on Hadoop YARN using a YARN Session
......@@ -179,7 +179,7 @@ Use the *run* action to submit a job to YARN. The client is able to determine th
~~~bash
wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
./bin/flink run ./examples/WordCount.jar \
./bin/flink run ./examples/batch/WordCount.jar \
hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt
~~~
......@@ -205,7 +205,7 @@ Please note that the client then expects the `-yn` value to be set (number of Ta
***Example:***
~~~bash
./bin/flink run -m yarn-cluster -yn 2 ./examples/WordCount.jar
./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar
~~~
The command line options of the YARN session are also available with the `./bin/flink` tool.
......
......@@ -43,7 +43,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<artifactId>flink-examples-batch</artifactId>
<version>${project.version}</version>
</dependency>
......@@ -73,7 +73,7 @@ under the License.
<build>
<plugins>
<!-- get default data from flink-java-examples package -->
<!-- get default data from flink-example-batch package -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
......@@ -89,7 +89,7 @@ under the License.
<artifactItems>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<artifactId>flink-examples-batch</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>false</overWrite>
......
......@@ -85,13 +85,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala-examples</artifactId>
<artifactId>flink-examples-batch</artifactId>
<version>${project.version}</version>
</dependency>
......
......@@ -103,7 +103,7 @@ under the License.
<!-- copy *.txt files -->
<fileSet>
<directory>src/main/flink-bin/</directory>
<outputDirectory></outputDirectory>
<outputDirectory/>
<fileMode>0644</fileMode>
<includes>
<include>*.txt</include>
......@@ -113,7 +113,7 @@ under the License.
<!-- copy LICENSE/NOTICE files -->
<fileSet>
<directory>../</directory>
<outputDirectory></outputDirectory>
<outputDirectory/>
<fileMode>0644</fileMode>
<includes>
<include>LICENSE*</include>
......@@ -150,21 +150,31 @@ under the License.
</excludes>
</fileSet>
<!-- copy jar files of java examples -->
<!-- copy jar files of the batch examples -->
<fileSet>
<directory>../flink-examples/flink-java-examples/target</directory>
<outputDirectory>examples</outputDirectory>
<directory>../flink-examples/flink-examples-batch/target</directory>
<outputDirectory>examples/batch</outputDirectory>
<fileMode>0644</fileMode>
<includes>
<include>*.jar</include>
</includes>
<excludes>
<exclude>flink-java-examples*-${project.version}.jar</exclude>
<exclude>original-flink-java-examples*-${project.version}.jar</exclude>
<exclude>flink-java-examples*-${project.version}-sources.jar</exclude>
<exclude>flink-java-examples*-${project.version}-tests.jar</exclude>
<exclude>flink-java-examples*-${project.version}-javadoc.jar</exclude>
<exclude>flink-java-examples*-${project.version}-*.jar</exclude>
<exclude>flink-examples-batch*.jar</exclude>
<exclude>original-flink-examples-batch*.jar</exclude>
</excludes>
</fileSet>
<!-- copy jar files of the streaming examples -->
<fileSet>
<directory>../flink-examples/flink-examples-streaming/target</directory>
<outputDirectory>examples/streaming</outputDirectory>
<fileMode>0644</fileMode>
<includes>
<include>*.jar</include>
</includes>
<excludes>
<exclude>flink-examples-streaming*.jar</exclude>
<exclude>original-flink-examples-streaming*.jar</exclude>
</excludes>
</fileSet>
......
......@@ -28,26 +28,28 @@ under the License.
<relativePath>..</relativePath>
</parent>
<artifactId>flink-scala-examples</artifactId>
<name>flink-scala-examples</name>
<artifactId>flink-examples-batch</artifactId>
<name>flink-examples-batch</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala</artifactId>
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<artifactId>flink-scala</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Scala Compiler -->
<plugin>
<groupId>net.alchim31.maven</groupId>
......@@ -63,16 +65,6 @@ under the License.
<goal>compile</goal>
</goals>
</execution>
<!-- Run scala compiler in the process-test-resources phase, so that dependencies on
scala classes can be resolved later in the (Java) test-compile phase -->
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<jvmArgs>
......@@ -81,7 +73,7 @@ under the License.
</jvmArgs>
</configuration>
</plugin>
<!-- Eclipse Integration -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
......@@ -146,6 +138,7 @@ under the License.
</executions>
</plugin>
<!-- Scala Code Style -->
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
......@@ -170,40 +163,13 @@ under the License.
</configuration>
</plugin>
<!-- get default data from flink-java-examples package -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.9</version><!--$NO-MVN-MAN-VER$-->
<executions>
<execution>
<id>unpack</id>
<phase>prepare-package</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>false</overWrite>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
<includes>**/util/*Data*.class</includes>
</artifactItem>
</artifactItems>
</configuration>
</execution>
</executions>
</plugin>
<!-- create the exampe JAR files -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<!-- KMeans -->
<execution>
<id>KMeans</id>
......@@ -217,13 +183,13 @@ under the License.
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.scala.clustering.KMeans</program-class>
<program-class>org.apache.flink.examples.java.clustering.KMeans</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/scala/clustering/KMeans.class</include>
<include>**/scala/clustering/KMeans$*.class</include>
<include>**/java/clustering/KMeans.class</include>
<include>**/java/clustering/KMeans$*.class</include>
<include>**/java/clustering/util/KMeansDataGenerator.class</include>
<include>**/java/clustering/util/KMeansData.class</include>
</includes>
......@@ -242,13 +208,13 @@ under the License.
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.scala.graph.TransitiveClosureNaive</program-class>
<program-class>org.apache.flink.examples.java.graph.TransitiveClosureNaive</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/scala/graph/TransitiveClosureNaive.class</include>
<include>**/scala/graph/TransitiveClosureNaive$*.class</include>
<include>**/java/graph/TransitiveClosureNaive.class</include>
<include>**/java/graph/TransitiveClosureNaive$*.class</include>
<include>**/java/graph/util/ConnectedComponentsData.class</include>
</includes>
</configuration>
......@@ -266,13 +232,13 @@ under the License.
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.scala.graph.ConnectedComponents</program-class>
<program-class>org.apache.flink.examples.java.graph.ConnectedComponents</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/scala/graph/ConnectedComponents.class</include>
<include>**/scala/graph/ConnectedComponents$*.class</include>
<include>**/java/graph/ConnectedComponents.class</include>
<include>**/java/graph/ConnectedComponents$*.class</include>
<include>**/java/graph/util/ConnectedComponentsData.class</include>
</includes>
</configuration>
......@@ -280,208 +246,155 @@ under the License.
<!-- EnumTriangles Basic -->
<execution>
<id>EnumTrianglesBasic</id>
<id>EnumerateGraphTriangles</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>EnumTrianglesBasic</classifier>
<classifier>EnumerateGraphTriangles</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.scala.graph.EnumTrianglesBasic</program-class>
<program-class>org.apache.flink.examples.java.graph.EnumTrianglesBasic</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/scala/graph/EnumTrianglesBasic.class</include>
<include>**/scala/graph/EnumTrianglesBasic$*.class</include>
<include>**/java/graph/EnumTrianglesBasic.class</include>
<include>**/java/graph/EnumTrianglesBasic$*.class</include>
<include>**/java/graph/util/EnumTrianglesDataTypes.class</include>
<include>**/java/graph/util/EnumTrianglesDataTypes$*.class</include>
<include>**/java/graph/util/EnumTrianglesData.class</include>
</includes>
</configuration>
</execution>
<!-- EnumTriangles Opt -->
<!-- PageRank -->
<execution>
<id>EnumTrianglesOpt</id>
<id>PageRank</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>EnumTrianglesOpt</classifier>
<classifier>PageRank</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.scala.graph.EnumTrianglesOpt</program-class>
<program-class>org.apache.flink.examples.java.graph.PageRank</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/scala/graph/EnumTrianglesOpt.class</include>
<include>**/scala/graph/EnumTrianglesOpt$*.class</include>
<include>**/java/graph/util/EnumTrianglesData.class</include>
<include>**/java/graph/PageRank.class</include>
<include>**/java/graph/PageRank$*.class</include>
<include>**/java/graph/util/PageRankData.class</include>
</includes>
</configuration>
</execution>
<!-- PageRank Basic-->
<!-- WebLogAnalysis -->
<execution>
<id>PageRankBasic</id>
<id>WebLogAnalysis</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>PageRankBasic</classifier>
<classifier>WebLogAnalysis</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.scala.graph.PageRankBasic</program-class>
<program-class>org.apache.flink.examples.java.relational.WebLogAnalysis</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/scala/graph/PageRankBasic.class</include>
<include>**/scala/graph/PageRankBasic$*.class</include>
<include>**/java/graph/util/PageRankData.class</include>
<include>**/java/relational/WebLogAnalysis.class</include>
<include>**/java/relational/WebLogAnalysis$*.class</include>
<include>**/java/relational/util/WebLogData.class</include>
<include>**/java/relational/util/WebLogDataGenerator.class</include>
</includes>
</configuration>
</execution>
<!-- These queries are currently not self-contained -->
<!-- TPC-H Query 10 -->
<!--
<!-- WordCount -->
<execution>
<id>TPCHQuery10</id>
<id>WordCount</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>TPCHQuery10</classifier>
<classifier>WordCount</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.scala.relational.TPCHQuery10</program-class>
<program-class>org.apache.flink.examples.java.wordcount.WordCount</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/scala/relational/TPCHQuery10.class</include>
<include>**/scala/relational/TPCHQuery10$*.class</include>
</includes>
</configuration>
</execution> -->
<!-- TPC-H Query 3 -->
<!--
<execution>
<id>TPCHQuery3</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>TPCHQuery3</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.scala.relational.TPCHQuery3</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/scala/relational/TPCHQuery3.class</include>
<include>**/scala/relational/TPCHQuery3$*.class</include>
<include>**/java/wordcount/WordCount.class</include>
<include>**/java/wordcount/WordCount$*.class</include>
<include>**/java/wordcount/util/WordCountData.class</include>
</includes>
</configuration>
</execution> -->
<!-- WebLogAnalysis -->
</execution>
<!-- Distributed Copy -->
<execution>
<id>WebLogAnalysis</id>
<id>DistCp</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>WebLogAnalysis</classifier>
<classifier>DistCp</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.scala.relational.WebLogAnalysis</program-class>
<program-class>org.apache.flink.examples.java.distcp.DistCp</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/scala/relational/WebLogAnalysis.class</include>
<include>**/scala/relational/WebLogAnalysis$*.class</include>
<include>**/java/relational/util/WebLogData.class</include>
<include>**/java/relational/util/WebLogDataGenerator.class</include>
<include>**/java/distcp/*</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
<!-- WordCount -->
<!--simplify the name of example JARs for build-target/examples -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>WordCount</id>
<id>rename</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
<goal>run</goal>
</goals>
<configuration>
<classifier>WordCount</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.scala.wordcount.WordCount</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/scala/wordcount/WordCount.class</include>
<include>**/scala/wordcount/WordCount$*.class</include>
<include>**/java/wordcount/util/WordCountData.class</include>
</includes>
<configuration>
<target>
<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-KMeans.jar" tofile="${project.basedir}/target/KMeans.jar" />
<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-ConnectedComponents.jar" tofile="${project.basedir}/target/ConnectedComponents.jar" />
<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-EnumerateGraphTriangles.jar" tofile="${project.basedir}/target/EnumerateGraphTriangles.jar" />
<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-PageRank.jar" tofile="${project.basedir}/target/PageRank.jar" />
<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-TransitiveClosure.jar" tofile="${project.basedir}/target/TransitiveClosure.jar" />
<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-WebLogAnalysis.jar" tofile="${project.basedir}/target/WebLogAnalysis.jar" />
<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
<copy file="${project.basedir}/target/flink-examples-batch-${project.version}-DistCp.jar" tofile="${project.basedir}/target/DistCp.jar" />
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- This plugin's configuration is used to store Eclipse m2e settings only.
It has no influence on the Maven build itself and simply suppresses errors in Eclipse.-->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<versionRange>[2.9,)</versionRange>
<goals>
<goal>unpack</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
......@@ -64,7 +64,7 @@ import org.apache.flink.api.java.operators.IterativeDataSet;
*
* <p>
* Usage: <code>KMeans &lt;points path&gt; &lt;centers path&gt; &lt;result path&gt; &lt;num iterations&gt;</code><br>
* If no parameters are provided, the program is run with default data from {@link KMeansData} and 10 iterations.
* If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.clustering.util.KMeansData} and 10 iterations.
*
* <p>
* This example shows how to use:
......@@ -340,5 +340,4 @@ public class KMeans {
return KMeansData.getDefaultCentroidDataSet(env);
}
}
}
......@@ -16,7 +16,6 @@
* limitations under the License.
*/
package org.apache.flink.examples.java.distcp;
import org.apache.flink.api.common.io.InputFormat;
......@@ -24,6 +23,7 @@ import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......
......@@ -30,11 +30,11 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFir
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
/**
* An implementation of the connected components algorithm, using a delta iteration.
......@@ -63,7 +63,7 @@ import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
*
* <p>
* Usage: <code>ConnectedComponents &lt;vertices path&gt; &lt;edges path&gt; &lt;result path&gt; &lt;max number of iterations&gt;</code><br>
* If no parameters are provided, the program is run with default data from {@link ConnectedComponentsData} and 10 iterations.
* If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.graph.util.ConnectedComponentsData} and 10 iterations.
*
* <p>
* This example shows how to use:
......
......@@ -18,22 +18,22 @@
package org.apache.flink.examples.java.graph;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.graph.util.EnumTrianglesData;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
......
......@@ -18,25 +18,23 @@
package org.apache.flink.examples.java.graph;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.graph.util.EnumTrianglesData;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.EdgeWithDegrees;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
* Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
......@@ -70,7 +68,7 @@ import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Triad;
* </pre>
*
* Usage: <code>EnumTriangleOpt &lt;edge path&gt; &lt;result path&gt;</code><br>
* If no parameters are provided, the program is run with default data from {@link EnumTrianglesData}.
* If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.graph.util.EnumTrianglesData}.
*
* <p>
* This example shows how to use:
......@@ -97,26 +95,26 @@ public class EnumTrianglesOpt {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// read input data
DataSet<Edge> edges = getEdgeDataSet(env);
DataSet<EnumTrianglesDataTypes.Edge> edges = getEdgeDataSet(env);
// annotate edges with degrees
DataSet<EdgeWithDegrees> edgesWithDegrees = edges
DataSet<EnumTrianglesDataTypes.EdgeWithDegrees> edgesWithDegrees = edges
.flatMap(new EdgeDuplicator())
.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new DegreeCounter())
.groupBy(EdgeWithDegrees.V1,EdgeWithDegrees.V2).reduce(new DegreeJoiner());
.groupBy(EnumTrianglesDataTypes.Edge.V1).sortGroup(EnumTrianglesDataTypes.Edge.V2, Order.ASCENDING).reduceGroup(new DegreeCounter())
.groupBy(EnumTrianglesDataTypes.EdgeWithDegrees.V1, EnumTrianglesDataTypes.EdgeWithDegrees.V2).reduce(new DegreeJoiner());
// project edges by degrees
DataSet<Edge> edgesByDegree = edgesWithDegrees
DataSet<EnumTrianglesDataTypes.Edge> edgesByDegree = edgesWithDegrees
.map(new EdgeByDegreeProjector());
// project edges by vertex id
DataSet<Edge> edgesById = edgesByDegree
DataSet<EnumTrianglesDataTypes.Edge> edgesById = edgesByDegree
.map(new EdgeByIdProjector());
DataSet<Triad> triangles = edgesByDegree
DataSet<EnumTrianglesDataTypes.Triad> triangles = edgesByDegree
// build triads
.groupBy(Edge.V1).sortGroup(Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
.groupBy(EnumTrianglesDataTypes.Edge.V1).sortGroup(EnumTrianglesDataTypes.Edge.V2, Order.ASCENDING).reduceGroup(new TriadBuilder())
// filter triads
.join(edgesById).where(Triad.V2,Triad.V3).equalTo(Edge.V1,Edge.V2).with(new TriadFilter());
.join(edgesById).where(EnumTrianglesDataTypes.Triad.V2, EnumTrianglesDataTypes.Triad.V3).equalTo(EnumTrianglesDataTypes.Edge.V1, EnumTrianglesDataTypes.Edge.V2).with(new TriadFilter());
// emit result
if(fileOutput) {
......@@ -136,21 +134,21 @@ public class EnumTrianglesOpt {
/** Converts a Tuple2 into an Edge */
@ForwardedFields("0;1")
public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, Edge> {
private final Edge outEdge = new Edge();
public static class TupleEdgeConverter implements MapFunction<Tuple2<Integer, Integer>, EnumTrianglesDataTypes.Edge> {
private final EnumTrianglesDataTypes.Edge outEdge = new EnumTrianglesDataTypes.Edge();
@Override
public Edge map(Tuple2<Integer, Integer> t) throws Exception {
public EnumTrianglesDataTypes.Edge map(Tuple2<Integer, Integer> t) throws Exception {
outEdge.copyVerticesFromTuple2(t);
return outEdge;
}
}
/** Emits for an edge the original edge and its switched version. */
private static class EdgeDuplicator implements FlatMapFunction<Edge, Edge> {
private static class EdgeDuplicator implements FlatMapFunction<EnumTrianglesDataTypes.Edge, EnumTrianglesDataTypes.Edge> {
@Override
public void flatMap(Edge edge, Collector<Edge> out) throws Exception {
public void flatMap(EnumTrianglesDataTypes.Edge edge, Collector<EnumTrianglesDataTypes.Edge> out) throws Exception {
out.collect(edge);
edge.flipVertices();
out.collect(edge);
......@@ -162,19 +160,19 @@ public class EnumTrianglesOpt {
* Emits one edge for each input edge with a degree annotation for the shared vertex.
* For each emitted edge, the first vertex is the vertex with the smaller id.
*/
private static class DegreeCounter implements GroupReduceFunction<Edge, EdgeWithDegrees> {
private static class DegreeCounter implements GroupReduceFunction<EnumTrianglesDataTypes.Edge, EnumTrianglesDataTypes.EdgeWithDegrees> {
final ArrayList<Integer> otherVertices = new ArrayList<Integer>();
final EdgeWithDegrees outputEdge = new EdgeWithDegrees();
final EnumTrianglesDataTypes.EdgeWithDegrees outputEdge = new EnumTrianglesDataTypes.EdgeWithDegrees();
@Override
public void reduce(Iterable<Edge> edgesIter, Collector<EdgeWithDegrees> out) {
public void reduce(Iterable<EnumTrianglesDataTypes.Edge> edgesIter, Collector<EnumTrianglesDataTypes.EdgeWithDegrees> out) {
Iterator<Edge> edges = edgesIter.iterator();
Iterator<EnumTrianglesDataTypes.Edge> edges = edgesIter.iterator();
otherVertices.clear();
// get first edge
Edge edge = edges.next();
EnumTrianglesDataTypes.Edge edge = edges.next();
Integer groupVertex = edge.getFirstVertex();
this.otherVertices.add(edge.getSecondVertex());
......@@ -212,11 +210,11 @@ public class EnumTrianglesOpt {
* degree annotation.
*/
@ForwardedFields("0;1")
private static class DegreeJoiner implements ReduceFunction<EdgeWithDegrees> {
private final EdgeWithDegrees outEdge = new EdgeWithDegrees();
private static class DegreeJoiner implements ReduceFunction<EnumTrianglesDataTypes.EdgeWithDegrees> {
private final EnumTrianglesDataTypes.EdgeWithDegrees outEdge = new EnumTrianglesDataTypes.EdgeWithDegrees();
@Override
public EdgeWithDegrees reduce(EdgeWithDegrees edge1, EdgeWithDegrees edge2) throws Exception {
public EnumTrianglesDataTypes.EdgeWithDegrees reduce(EnumTrianglesDataTypes.EdgeWithDegrees edge1, EnumTrianglesDataTypes.EdgeWithDegrees edge2) throws Exception {
// copy first edge
outEdge.copyFrom(edge1);
......@@ -232,12 +230,12 @@ public class EnumTrianglesOpt {
}
/** Projects an edge (pair of vertices) such that the first vertex is the vertex with the smaller degree. */
private static class EdgeByDegreeProjector implements MapFunction<EdgeWithDegrees, Edge> {
private static class EdgeByDegreeProjector implements MapFunction<EnumTrianglesDataTypes.EdgeWithDegrees, EnumTrianglesDataTypes.Edge> {
private final Edge outEdge = new Edge();
private final EnumTrianglesDataTypes.Edge outEdge = new EnumTrianglesDataTypes.Edge();
@Override
public Edge map(EdgeWithDegrees inEdge) throws Exception {
public EnumTrianglesDataTypes.Edge map(EnumTrianglesDataTypes.EdgeWithDegrees inEdge) throws Exception {
// copy vertices to simple edge
outEdge.copyVerticesFromEdgeWithDegrees(inEdge);
......@@ -253,10 +251,10 @@ public class EnumTrianglesOpt {
}
/** Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second. */
private static class EdgeByIdProjector implements MapFunction<Edge, Edge> {
private static class EdgeByIdProjector implements MapFunction<EnumTrianglesDataTypes.Edge, EnumTrianglesDataTypes.Edge> {
@Override
public Edge map(Edge inEdge) throws Exception {
public EnumTrianglesDataTypes.Edge map(EnumTrianglesDataTypes.Edge inEdge) throws Exception {
// flip vertices if necessary
if(inEdge.getFirstVertex() > inEdge.getSecondVertex()) {
......@@ -273,20 +271,20 @@ public class EnumTrianglesOpt {
* Assumes that input edges share the first vertex and are in ascending order of the second vertex.
*/
@ForwardedFields("0")
private static class TriadBuilder implements GroupReduceFunction<Edge, Triad> {
private static class TriadBuilder implements GroupReduceFunction<EnumTrianglesDataTypes.Edge, EnumTrianglesDataTypes.Triad> {
private final List<Integer> vertices = new ArrayList<Integer>();
private final Triad outTriad = new Triad();
private final EnumTrianglesDataTypes.Triad outTriad = new EnumTrianglesDataTypes.Triad();
@Override
public void reduce(Iterable<Edge> edgesIter, Collector<Triad> out) throws Exception {
final Iterator<Edge> edges = edgesIter.iterator();
public void reduce(Iterable<EnumTrianglesDataTypes.Edge> edgesIter, Collector<EnumTrianglesDataTypes.Triad> out) throws Exception {
final Iterator<EnumTrianglesDataTypes.Edge> edges = edgesIter.iterator();
// clear vertex list
vertices.clear();
// read first edge
Edge firstEdge = edges.next();
EnumTrianglesDataTypes.Edge firstEdge = edges.next();
outTriad.setFirstVertex(firstEdge.getFirstVertex());
vertices.add(firstEdge.getSecondVertex());
......@@ -306,10 +304,10 @@ public class EnumTrianglesOpt {
}
/** Filters triads (three vertices connected by two edges) without a closing third edge. */
private static class TriadFilter implements JoinFunction<Triad, Edge, Triad> {
private static class TriadFilter implements JoinFunction<EnumTrianglesDataTypes.Triad, EnumTrianglesDataTypes.Edge, EnumTrianglesDataTypes.Triad> {
@Override
public Triad join(Triad triad, Edge edge) throws Exception {
public EnumTrianglesDataTypes.Triad join(EnumTrianglesDataTypes.Triad triad, EnumTrianglesDataTypes.Edge edge) throws Exception {
return triad;
}
}
......@@ -343,7 +341,7 @@ public class EnumTrianglesOpt {
return true;
}
private static DataSet<Edge> getEdgeDataSet(ExecutionEnvironment env) {
private static DataSet<EnumTrianglesDataTypes.Edge> getEdgeDataSet(ExecutionEnvironment env) {
if(fileOutput) {
return env.readCsvFile(edgePath)
.fieldDelimiter(" ")
......
......@@ -18,22 +18,22 @@
package org.apache.flink.examples.java.graph;
import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
import java.util.ArrayList;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.graph.util.PageRankData;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
/**
* A basic implementation of the Page Rank algorithm using a bulk iteration.
......@@ -59,7 +59,7 @@ import org.apache.flink.examples.java.graph.util.PageRankData;
*
* <p>
* Usage: <code>PageRankBasic &lt;pages path&gt; &lt;links path&gt; &lt;output path&gt; &lt;num pages&gt; &lt;num iterations&gt;</code><br>
* If no parameters are provided, the program is run with default data from {@link PageRankData} and 10 iterations.
* If no parameters are provided, the program is run with default data from {@link org.apache.flink.examples.java.graph.util.PageRankData} and 10 iterations.
*
* <p>
* This example shows how to use:
......@@ -67,12 +67,10 @@ import org.apache.flink.examples.java.graph.util.PageRankData;
* <li>Bulk Iterations
* <li>Default Join
* <li>Configure user-defined functions using constructor parameters.
* </ul>
*
*
* </ul>
*/
@SuppressWarnings("serial")
public class PageRankBasic {
public class PageRank {
private static final double DAMPENING_FACTOR = 0.85;
private static final double EPSILON = 0.0001;
......
......@@ -16,16 +16,15 @@
* limitations under the License.
*/
package org.apache.flink.examples.java.graph;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
import org.apache.flink.util.Collector;
......
......@@ -24,7 +24,6 @@ import java.util.List;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.examples.java.graph.util.EnumTrianglesDataTypes.Edge;
/**
* Provides the default data sets used for the Triangle Enumeration example programs.
......@@ -47,11 +46,11 @@ public class EnumTrianglesData {
{7, 8}
};
public static DataSet<Edge> getDefaultEdgeDataSet(ExecutionEnvironment env) {
public static DataSet<EnumTrianglesDataTypes.Edge> getDefaultEdgeDataSet(ExecutionEnvironment env) {
List<Edge> edges = new ArrayList<Edge>();
List<EnumTrianglesDataTypes.Edge> edges = new ArrayList<EnumTrianglesDataTypes.Edge>();
for(Object[] e : EDGES) {
edges.add(new Edge((Integer)e[0], (Integer)e[1]));
edges.add(new EnumTrianglesDataTypes.Edge((Integer)e[0], (Integer)e[1]));
}
return env.fromCollection(edges);
......
......@@ -89,7 +89,7 @@ public class CollectionExecutionExample {
List<Tuple2<User,EMail>> result = joined.collect();
// Do some work with the resulting ArrayList (=Collection).
for(Tuple2<User, EMail> t : result) {
for (Tuple2<User, EMail> t : result) {
System.err.println("Result = " + t);
}
}
......
......@@ -20,8 +20,7 @@ package org.apache.flink.examples.java.ml.util;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.examples.java.ml.LinearRegression.Data;
import org.apache.flink.examples.java.ml.LinearRegression.Params;
import org.apache.flink.examples.java.ml.LinearRegression;
import java.util.LinkedList;
import java.util.List;
......@@ -51,20 +50,20 @@ public class LinearRegressionData {
new Object[] { 0.28, 0.57 }, new Object[] { 1.65, 3.30 },
new Object[] { -0.55, -1.08 } };
public static DataSet<Params> getDefaultParamsDataSet(
public static DataSet<LinearRegression.Params> getDefaultParamsDataSet(
ExecutionEnvironment env) {
List<Params> paramsList = new LinkedList<Params>();
List<LinearRegression.Params> paramsList = new LinkedList<LinearRegression.Params>();
for (Object[] params : PARAMS) {
paramsList.add(new Params((Double) params[0], (Double) params[1]));
paramsList.add(new LinearRegression.Params((Double) params[0], (Double) params[1]));
}
return env.fromCollection(paramsList);
}
public static DataSet<Data> getDefaultDataDataSet(ExecutionEnvironment env) {
public static DataSet<LinearRegression.Data> getDefaultDataDataSet(ExecutionEnvironment env) {
List<Data> dataList = new LinkedList<Data>();
List<LinearRegression.Data> dataList = new LinkedList<LinearRegression.Data>();
for (Object[] data : DATA) {
dataList.add(new Data((Double) data[0], (Double) data[1]));
dataList.add(new LinearRegression.Data((Double) data[0], (Double) data[1]));
}
return env.fromCollection(dataList);
}
......
......@@ -23,6 +23,7 @@ import java.util.Collection;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.java.DataSet;
......
......@@ -18,18 +18,16 @@
package org.apache.flink.examples.java.relational;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.util.Collector;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.examples.java.relational.util.WebLogData;
import org.apache.flink.examples.java.relational.util.WebLogDataGenerator;
import org.apache.flink.util.Collector;
/**
* This program processes web logs and relational data.
......@@ -54,7 +52,7 @@ import org.apache.flink.examples.java.relational.util.WebLogDataGenerator;
*
* <p>
* Input files are plain text CSV files using the pipe character ('|') as field separator.
* The tables referenced in the query can be generated using the {@link WebLogDataGenerator} and
* The tables referenced in the query can be generated using the {@link org.apache.flink.examples.java.relational.util.WebLogDataGenerator} and
* have the following schemas
* <pre>{@code
* CREATE TABLE Documents (
......
......@@ -54,7 +54,7 @@ public class WordCount {
public static void main(String[] args) throws Exception {
if(!parseParameters(args)) {
if (!parseParameters(args)) {
return;
}
......@@ -72,7 +72,7 @@ public class WordCount {
.sum(1);
// emit result
if(fileOutput) {
if (fileOutput) {
counts.writeAsCsv(outputPath, "\n", " ");
// execute program
env.execute("WordCount Example");
......
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.java.wordcount;
import org.apache.flink.api.common.functions.FlatMapFunction;
......@@ -25,14 +26,12 @@ import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.util.Collector;
/**
* This example shows an implementation of Wordcount without using the
* This example shows an implementation of WordCount without using the
* Tuple2 type, but a custom class.
*
*/
@SuppressWarnings("serial")
public class PojoExample {
public class WordCountPojo {
/**
* This is the POJO (Plain Old Java Object) that is being used
......@@ -40,31 +39,36 @@ public class PojoExample {
* As long as all fields are public or have a getter/setter, the system can handle them
*/
public static class Word {
// fields
private String word;
private Integer frequency;
private int frequency;
// constructors
public Word() {
}
public Word() {}
public Word(String word, int i) {
this.word = word;
this.frequency = i;
}
// getters setters
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Integer getFrequency() {
public int getFrequency() {
return frequency;
}
public void setFrequency(Integer frequency) {
public void setFrequency(int frequency) {
this.frequency = frequency;
}
// to String
@Override
public String toString() {
return "Word="+word+" freq="+frequency;
......@@ -72,8 +76,7 @@ public class PojoExample {
}
public static void main(String[] args) throws Exception {
if(!parseParameters(args)) {
if (!parseParameters(args)) {
return;
}
......@@ -95,7 +98,7 @@ public class PojoExample {
}
});
if(fileOutput) {
if (fileOutput) {
counts.writeAsText(outputPath, WriteMode.OVERWRITE);
// execute program
env.execute("WordCount-Pojo Example");
......@@ -112,10 +115,9 @@ public class PojoExample {
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
* multiple Word objects.
*/
public static final class Tokenizer implements FlatMapFunction<String, Word> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<Word> out) {
......@@ -141,10 +143,10 @@ public class PojoExample {
private static boolean parseParameters(String[] args) {
if(args.length > 0) {
if (args.length > 0) {
// parse input arguments
fileOutput = true;
if(args.length == 2) {
if (args.length == 2) {
textPath = args[0];
outputPath = args[1];
} else {
......@@ -160,7 +162,7 @@ public class PojoExample {
}
private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
if(fileOutput) {
if (fileOutput) {
// read the text file from given input path
return env.readTextFile(textPath);
} else {
......
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.graph
import org.apache.flink.api.scala._
......
......@@ -15,11 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.graph
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
import org.apache.flink.core.fs.FileSystem.WriteMode
object DeltaPageRank {
......
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.graph
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
......@@ -28,7 +29,6 @@ import org.apache.flink.api.common.operators.Order
import scala.collection.mutable
/**
* Triangle enumeration is a pre-processing step to find closely connected parts in graphs.
* A triangle consists of three edges that connect three vertices with each other.
......
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.graph
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
......@@ -25,7 +26,6 @@ import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.util.Collector
import org.apache.flink.examples.java.graph.util.EnumTrianglesData
import org.apache.flink.api.common.operators.Order
import scala.collection.mutable.MutableList
import scala.collection.mutable
......
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.graph
import org.apache.flink.api.scala._
......
......@@ -15,10 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.relational
import org.apache.flink.api.scala._
import org.apache.flink.util.Collector
import org.apache.flink.api.java.aggregation.Aggregations
......
......@@ -15,10 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.relational
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.api.java.aggregation.Aggregations
......
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.relational
import org.apache.flink.api.scala._
......
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala.wordcount
import org.apache.flink.api.scala._
......
......@@ -24,13 +24,13 @@ under the License.
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parent</artifactId>
<artifactId>flink-examples</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>flink-streaming-examples</artifactId>
<name>flink-streaming-examples</name>
<artifactId>flink-examples-streaming</artifactId>
<name>flink-examples-streaming</name>
<packaging>jar</packaging>
......@@ -49,7 +49,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<artifactId>flink-examples-batch</artifactId>
<version>${project.version}</version>
</dependency>
......@@ -67,13 +67,6 @@ under the License.
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-tests</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
......@@ -85,7 +78,33 @@ under the License.
<build>
<plugins>
<!-- get default data from flink-java-examples package -->
<!-- Scala Code Style -->
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>0.5.0</version>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
</plugin>
<!-- get default data from flink-examples-batch package -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
......@@ -102,7 +121,7 @@ under the License.
<!-- For WordCount example data -->
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<artifactId>flink-examples-batch</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>false</overWrite>
......@@ -475,28 +494,33 @@ under the License.
</executions>
</plugin>
<!--simplify the name of example JARs for build-target/examples -->
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>0.5.0</version>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>rename</id>
<phase>package</phase>
<goals>
<goal>check</goal>
<goal>run</goal>
</goals>
<configuration>
<target>
<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-IncrementalLearning.jar" tofile="${project.basedir}/target/IncrementalLearning.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-Iteration.jar" tofile="${project.basedir}/target/Iteration.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-SessionWindowing.jar" tofile="${project.basedir}/target/SessionWindowing.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-SocketTextStreamWordCount.jar" tofile="${project.basedir}/target/SocketTextStreamWordCount.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-TopSpeedWindowing.jar" tofile="${project.basedir}/target/TopSpeedWindowing.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-Twitter.jar" tofile="${project.basedir}/target/Twitter.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-WindowJoin.jar" tofile="${project.basedir}/target/WindowJoin.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
<copy file="${project.basedir}/target/flink-examples-streaming-${project.version}-WindowWordCount.jar" tofile="${project.basedir}/target/WindowWordCount.jar" />
</target>
</configuration>
</execution>
</executions>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<configLocation>${project.basedir}/../tools/maven/scalastyle-config.xml</configLocation>
<outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
<outputEncoding>UTF-8</outputEncoding>
</configuration>
</plugin>
</plugins>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples</artifactId>
<version>1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>flink-java-examples</artifactId>
<name>flink-java-examples</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<!-- KMeans -->
<execution>
<id>KMeans</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>KMeans</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.java.clustering.KMeans</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/java/clustering/KMeans.class</include>
<include>**/java/clustering/KMeans$*.class</include>
<include>**/java/clustering/util/KMeansDataGenerator.class</include>
<include>**/java/clustering/util/KMeansData.class</include>
</includes>
</configuration>
</execution>
<!-- Transitive Closure -->
<execution>
<id>TransitiveClosure</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>TransitiveClosure</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.java.graph.TransitiveClosureNaive</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/java/graph/TransitiveClosureNaive.class</include>
<include>**/java/graph/TransitiveClosureNaive$*.class</include>
<include>**/java/graph/util/ConnectedComponentsData.class</include>
</includes>
</configuration>
</execution>
<!-- Connected Components -->
<execution>
<id>ConnectedComponents</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>ConnectedComponents</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.java.graph.ConnectedComponents</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/java/graph/ConnectedComponents.class</include>
<include>**/java/graph/ConnectedComponents$*.class</include>
<include>**/java/graph/util/ConnectedComponentsData.class</include>
</includes>
</configuration>
</execution>
<!-- EnumTriangles Basic -->
<execution>
<id>EnumTrianglesBasic</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>EnumTrianglesBasic</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.java.graph.EnumTrianglesBasic</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/java/graph/EnumTrianglesBasic.class</include>
<include>**/java/graph/EnumTrianglesBasic$*.class</include>
<include>**/java/graph/util/EnumTrianglesDataTypes.class</include>
<include>**/java/graph/util/EnumTrianglesDataTypes$*.class</include>
<include>**/java/graph/util/EnumTrianglesData.class</include>
</includes>
</configuration>
</execution>
<!-- EnumTriangles Opt -->
<execution>
<id>EnumTrianglesOpt</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>EnumTrianglesOpt</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.java.graph.EnumTrianglesOpt</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/java/graph/EnumTrianglesOpt.class</include>
<include>**/java/graph/EnumTrianglesOpt$*.class</include>
<include>**/java/graph/util/EnumTrianglesDataTypes.class</include>
<include>**/java/graph/util/EnumTrianglesDataTypes$*.class</include>
<include>**/java/graph/util/EnumTrianglesData.class</include>
</includes>
</configuration>
</execution>
<!-- PageRank Basic-->
<execution>
<id>PageRankBasic</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>PageRankBasic</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.java.graph.PageRankBasic</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/java/graph/PageRankBasic.class</include>
<include>**/java/graph/PageRankBasic$*.class</include>
<include>**/java/graph/util/PageRankData.class</include>
</includes>
</configuration>
</execution>
<!-- WebLogAnalysis -->
<execution>
<id>WebLogAnalysis</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>WebLogAnalysis</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.java.relational.WebLogAnalysis</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/java/relational/WebLogAnalysis.class</include>
<include>**/java/relational/WebLogAnalysis$*.class</include>
<include>**/java/relational/util/WebLogData.class</include>
<include>**/java/relational/util/WebLogDataGenerator.class</include>
</includes>
</configuration>
</execution>
<!-- WordCount -->
<execution>
<id>WordCount</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>WordCount</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.java.wordcount.WordCount</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/java/wordcount/WordCount.class</include>
<include>**/java/wordcount/WordCount$*.class</include>
<include>**/java/wordcount/util/WordCountData.class</include>
</includes>
</configuration>
</execution>
<!-- WordCountPOJO -->
<execution>
<id>WordCountPOJO</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>WordCountPOJO</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.java.wordcount.PojoExample</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/java/wordcount/PojoExample.class</include>
<include>**/java/wordcount/PojoExample$*.class</include>
<include>**/java/wordcount/util/WordCountData.class</include>
</includes>
</configuration>
</execution>
<!-- Distributed Copy -->
<execution>
<id>DistCp</id>
<phase>package</phase>
<goals>
<goal>jar</goal>
</goals>
<configuration>
<classifier>DistCp</classifier>
<archive>
<manifestEntries>
<program-class>org.apache.flink.examples.java.distcp.DistCp</program-class>
</manifestEntries>
</archive>
<includes>
<include>**/java/distcp/*</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
<!--simplify the name of example JARs for build-target/examples -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>rename</id>
<phase>package</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<copy file="${project.basedir}/target/flink-java-examples-${project.version}-KMeans.jar" tofile="${project.basedir}/target/KMeans.jar" />
<copy file="${project.basedir}/target/flink-java-examples-${project.version}-ConnectedComponents.jar" tofile="${project.basedir}/target/ConnectedComponents.jar" />
<copy file="${project.basedir}/target/flink-java-examples-${project.version}-EnumTrianglesBasic.jar" tofile="${project.basedir}/target/EnumTrianglesBasic.jar" />
<copy file="${project.basedir}/target/flink-java-examples-${project.version}-EnumTrianglesOpt.jar" tofile="${project.basedir}/target/EnumTrianglesOpt.jar" />
<copy file="${project.basedir}/target/flink-java-examples-${project.version}-PageRankBasic.jar" tofile="${project.basedir}/target/PageRankBasic.jar" />
<copy file="${project.basedir}/target/flink-java-examples-${project.version}-TransitiveClosure.jar" tofile="${project.basedir}/target/TransitiveClosure.jar" />
<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WebLogAnalysis.jar" tofile="${project.basedir}/target/WebLogAnalysis.jar" />
<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WordCount.jar" tofile="${project.basedir}/target/WordCount.jar" />
<copy file="${project.basedir}/target/flink-java-examples-${project.version}-WordCountPOJO.jar" tofile="${project.basedir}/target/WordCountPOJO.jar" />
<copy file="${project.basedir}/target/flink-java-examples-${project.version}-DistCp.jar" tofile="${project.basedir}/target/DistCp.jar" />
</target>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.java.wordcount;
import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
/**
* Same as {@link WordCount} but implements {@link ProgramDescription} interface.
*
* <p>
* The input is a plain text file with lines separated by newline characters.
*
* <p>
* Usage: <code>WordCountProgram [&lt;text path&gt; &lt;result path&gt;]</code><br>
* If no parameters are provided, the program is run with default data from {@link WordCountData}.
*
* <p>
* This example shows:
* <ul>
* <li>how to provide additional information (using {@link ProgramDescription} interface}, that can be displayed by
* Flink clients, ie, bin/flink and WebClient</li>
* </ul>
*
*/
public class WordCountMeta extends WordCount implements ProgramDescription {
public static void main(String[] args) throws Exception {
WordCount.main(args);
}
@Override
public String getDescription() {
return "Simple Word-Count Example\n"
+ "Parameters: [<text path> <result path>]\n"
+ "If no parameters are provided, the example will run with built-in default data.";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.examples.scala;
/**
* This dummy class exists only to create an empty
* javadoc.jar in the flink-scala-examples project.
* This is required for passing the maven central sync requirements.
*/
public class Dummy {}
......@@ -47,7 +47,7 @@ under the License.
</dependencies>
<modules>
<module>flink-java-examples</module>
<module>flink-scala-examples</module>
<module>flink-examples-batch</module>
<module>flink-examples-streaming</module>
</modules>
</project>
......@@ -59,7 +59,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<artifactId>flink-examples-batch</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
......
......@@ -54,7 +54,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala-examples</artifactId>
<artifactId>flink-examples-batch</artifactId>
<version>${project.version}</version>
</dependency>
......
......@@ -85,7 +85,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<artifactId>flink-examples-batch</artifactId>
<version>${project.version}</version>
<exclusions>
<exclusion>
......
......@@ -102,19 +102,11 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java-examples</artifactId>
<artifactId>flink-examples-batch</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala-examples</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
......
......@@ -22,9 +22,11 @@ import java.io.File;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.flink.examples.java.graph.PageRankBasic;
import org.apache.flink.examples.java.graph.PageRank;
import org.apache.flink.test.testdata.PageRankData;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
......@@ -68,13 +70,13 @@ public class PageRankITCase extends MultipleProgramsTestBase {
@Test
public void testPageRankSmallNumberOfIterations() throws Exception{
PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "3"});
PageRank.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES + "", "3"});
expected = PageRankData.RANKS_AFTER_3_ITERATIONS;
}
@Test
public void testPageRankWithConvergenceCriterion() throws Exception {
PageRankBasic.main(new String[] {verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES+"", "1000"});
PageRank.main(new String[]{verticesPath, edgesPath, resultPath, PageRankData.NUM_VERTICES + "", "1000"});
expected = PageRankData.RANKS_AFTER_EPSILON_0_0001_CONVERGENCE;
}
}
......@@ -31,11 +31,11 @@ import org.apache.flink.optimizer.plan.BulkPartialSolutionPlanNode;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.examples.java.graph.PageRankBasic.BuildOutgoingEdgeList;
import org.apache.flink.examples.java.graph.PageRankBasic.Dampener;
import org.apache.flink.examples.java.graph.PageRankBasic.EpsilonFilter;
import org.apache.flink.examples.java.graph.PageRankBasic.JoinVertexWithEdgesMatch;
import org.apache.flink.examples.java.graph.PageRankBasic.RankAssigner;
import org.apache.flink.examples.java.graph.PageRank.BuildOutgoingEdgeList;
import org.apache.flink.examples.java.graph.PageRank.Dampener;
import org.apache.flink.examples.java.graph.PageRank.EpsilonFilter;
import org.apache.flink.examples.java.graph.PageRank.JoinVertexWithEdgesMatch;
import org.apache.flink.examples.java.graph.PageRank.RankAssigner;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.optimizer.util.CompilerTestBase;
......
......@@ -26,7 +26,7 @@ import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.PageRankBasic;
import org.apache.flink.examples.java.graph.PageRank;
import org.apache.flink.examples.java.relational.TPCHQuery3;
import org.apache.flink.examples.java.relational.WebLogAnalysis;
import org.apache.flink.examples.java.wordcount.WordCount;
......@@ -128,7 +128,7 @@ public class DumpCompiledPlanTest extends CompilerTestBase {
PreviewPlanEnvironment env = new PreviewPlanEnvironment();
env.setAsContext();
try {
PageRankBasic.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "10", "123"});
PageRank.main(new String[]{IN_FILE, IN_FILE, OUT_FILE, "10", "123"});
} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
// all good.
} catch (Exception e) {
......
......@@ -25,7 +25,7 @@ import org.apache.flink.client.program.OptimizerPlanEnvironment;
import org.apache.flink.client.program.PreviewPlanEnvironment;
import org.apache.flink.examples.java.clustering.KMeans;
import org.apache.flink.examples.java.graph.ConnectedComponents;
import org.apache.flink.examples.java.graph.PageRankBasic;
import org.apache.flink.examples.java.graph.PageRank;
import org.apache.flink.examples.java.relational.TPCHQuery3;
import org.apache.flink.examples.java.wordcount.WordCount;
import org.apache.flink.optimizer.Optimizer;
......@@ -129,7 +129,7 @@ public class PreviewPlanDumpTest extends CompilerTestBase {
PreviewPlanEnvironment env = new PreviewPlanEnvironment();
env.setAsContext();
try {
PageRankBasic.main(new String[] {IN_FILE, IN_FILE, OUT_FILE, "10", "123"});
PageRank.main(new String[]{IN_FILE, IN_FILE, OUT_FILE, "10", "123"});
} catch(OptimizerPlanEnvironment.ProgramAbortException pae) {
// all good.
} catch (Exception e) {
......
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册