diff --git a/docs/java_api_examples.md b/docs/examples.md
similarity index 71%
rename from docs/java_api_examples.md
rename to docs/examples.md
index a45b39e88c215f5677dbcad1ea5abbed04ef944c..86f6fe0d2a3fcf9cfedf3e1bdaa7d4bc04c7a4c0 100644
--- a/docs/java_api_examples.md
+++ b/docs/examples.md
@@ -1,5 +1,5 @@
---
-title: "Java API Examples"
+title: "Bundled Examples"
---
* This will be replaced by the TOC
@@ -7,13 +7,17 @@ title: "Java API Examples"
The following example programs showcase different applications of Flink
from simple word counting to graph algorithms. The code samples illustrate the
-use of [Flink's Java API](java_api_guide.html).
+use of [Flink's API](programming_guide.html).
-The full source code of the following and more examples can be found in the __flink-java-examples__ module.
+The full source code of the following and more examples can be found in the __flink-java-examples__
+or __flink-scala-examples__ module.
## Word Count
WordCount is the "Hello World" of Big Data processing systems. It computes the frequency of words in a text collection. The algorithm works in two steps: First, the texts are splits the text to individual words. Second, the words are grouped and counted.
+
+
+
~~~java
// get input data
DataSet text = getTextDataSet(env);
@@ -47,12 +51,38 @@ public static final class Tokenizer extends FlatMapFunction,
+
+
+~~~scala
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+// get input data
+val text = getTextDataSet(env)
+
+val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
+ .map { (_, 1) }
+ .groupBy(0)
+ .sum(1)
+
+counts.writeAsCsv(outputPath, "\n", " ")
+~~~
+
+The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala "WordCount example" %} implements the above described algorithm with input parameters: `,
+
+
## Page Rank
The PageRank algorithm computes the "importance" of pages in a graph defined by links, which point from one pages to another page. It is an iterative graph algorithm, which means that it repeatedly applies the same computation. In each iteration, each page distributes its current rank over all its neighbors, and compute its new rank as a taxed sum of the ranks it received from its neighbors. The PageRank algorithm was popularized by the Google search engine which uses the importance of webpages to rank the results of search queries.
In this simple example, PageRank is implemented with a [bulk iteration](java_api_guide.html#iterations) and a fixed number of iterations.
+
+
+
~~~java
// get input data
DataSet> pagesWithRanks = getPagesWithRanksDataSet(env);
@@ -124,6 +154,73 @@ public static final class EpsilonFilter
The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java "PageRank program" %} implements the above example.
It requires the following parameters to run: `, ,
+
+
+~~~scala
+// set up execution environment
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+// read input data
+val pages = getPagesDataSet(env)
+val links = getLinksDataSet(env)
+
+// assign initial ranks to pages
+val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
+
+// build adjacency list from link input
+val adjacencyLists = links
+ // initialize lists
+ .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
+ // concatenate lists
+ .groupBy("sourceId").reduce {
+ (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
+ }
+
+// start iteration
+val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
+ currentRanks =>
+ val newRanks = currentRanks
+ // distribute ranks to target pages
+ .join(adjacencyLists).where("pageId").equalTo("sourceId") {
+ (page, adjacent, out: Collector[Page]) =>
+ for (targetId <- adjacent.targetIds) {
+ out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
+ }
+ }
+ // collect ranks and sum them up
+ .groupBy("pageId").aggregate(SUM, "rank")
+ // apply dampening factor
+ .map { p =>
+ Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
+ }
+
+ // terminate if no rank update was significant
+ val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
+ (current, next, out: Collector[Int]) =>
+ // check for significant update
+ if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
+ }
+
+ (newRanks, termination)
+}
+
+val result = finalRanks
+
+// emit result
+result.writeAsCsv(outputPath, "\n", " ")
+
+// User-defined types
+case class Link(sourceId: Long, targetId: Long)
+case class Page(pageId: Long, rank: Double)
+case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
+~~~
+
+he {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala "PageRank program" %} implements the above example.
+It requires the following parameters to run: `, ,
+
+
Input files are plain text files and must be formatted as follows:
- Pages represented as an (long) ID separated by new-line characters.
* For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63.
@@ -138,6 +235,9 @@ The Connected Components algorithm identifies parts of a larger graph which are
This implementation uses a [delta iteration](iterations.html): Vertices that have not changed their component ID do not participate in the next step. This yields much better performance, because the later iterations typically deal only with a few outlier vertices.
+
+
+
~~~java
// read vertex and edge data
DataSet vertices = getVertexDataSet(env);
@@ -214,6 +314,51 @@ public static final class ComponentIdFilter
The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java "ConnectedComponents program" %} implements the above example. It requires the following parameters to run: `, ,
+
+
+~~~scala
+// set up execution environment
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+// read vertex and edge data
+// assign the initial components (equal to the vertex id)
+val vertices = getVerticesDataSet(env).map { id => (id, id) }
+
+// undirected edges by emitting for each input edge the input edges itself and an inverted
+// version
+val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
+
+// open a delta iteration
+val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
+ (s, ws) =>
+
+ // apply the step logic: join with the edges
+ val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
+ (edge._2, vertex._2)
+ }
+
+ // select the minimum neighbor
+ val minNeighbors = allNeighbors.groupBy(0).min(1)
+
+ // update if the component of the candidate is smaller
+ val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
+ (newVertex, oldVertex, out: Collector[(Long, Long)]) =>
+ if (newVertex._2 < oldVertex._2) out.collect(newVertex)
+ }
+
+ // delta and new workset are identical
+ (updatedComponents, updatedComponents)
+}
+
+verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
+
+~~~
+
+The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala "ConnectedComponents program" %} implements the above example. It requires the following parameters to run: `, ,
+
+
Input files are plain text files and must be formatted as follows:
- Vertices represented as IDs and separated by new-line characters.
* For example `"1\n2\n12\n42\n63\n"` gives five vertices with (1), (2), (12), (42), and (63).
@@ -236,7 +381,10 @@ WHERE l_orderkey = o_orderkey
GROUP BY l_orderkey, o_shippriority;
~~~
-The Flink Java program, which implements the above query looks as follows.
+The Flink program, which implements the above query looks as follows.
+
+
+
~~~java
// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
@@ -285,6 +433,15 @@ priceSums.writeAsCsv(outputPath);
The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java "Relational Query program" %} implements the above query. It requires the following parameters to run: `, ,
+
+Coming soon...
+
+The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/RelationalQuery.scala "Relational Query program" %} implements the above query. It requires the following parameters to run: `, ,
+
+
The orders and lineitem files can be generated using the [TPC-H benchmark](http://www.tpc.org/tpch/) suite's data generator tool (DBGEN).
Take the following steps to generate arbitrary large input files for the provided Flink programs:
diff --git a/docs/scala_api_examples.md b/docs/scala_api_examples.md
deleted file mode 100644
index b6689f02d601493c28c9ebe74a20aad054736b5e..0000000000000000000000000000000000000000
--- a/docs/scala_api_examples.md
+++ /dev/null
@@ -1,195 +0,0 @@
----
-title: "Scala API Examples"
----
-
-The following example programs showcase different applications of Flink from simple word counting to graph algorithms.
-The code samples illustrate the use of [Flink's Scala API](scala_api_guide.html).
-
-The full source code of the following and more examples can be found in the [flink-scala-examples](https://github.com/apache/incubator-flink/tree/master/flink-examples/flink-scala-examples) module.
-
-# Word Count
-
-WordCount is the "Hello World" of Big Data processing systems. It computes the frequency of words in a text collection. The algorithm works in two steps: First, the texts are splits the text to individual words. Second, the words are grouped and counted.
-
-```scala
-// read input data
-val input = TextFile(textInput)
-
-// tokenize words
-val words = input.flatMap { _.split(" ") map { (_, 1) } }
-
-// count by word
-val counts = words.groupBy { case (word, _) => word }
- .reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }
-
-val output = counts.write(wordsOutput, CsvOutputFormat()))
-```
-
-The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala "WordCount example" %} implements the above described algorithm with input parameters: `, ,