提交 7084fa26 编写于 作者: A Aljoscha Krettek

[doc] Unify Examples Page

Now contains Java and Scala examples with tabs to switch between the
two.
上级 97d630d5
......@@ -57,8 +57,7 @@
<li>Examples
<ul>
<li><a href="java_api_examples.html">Java API</a></li>
<li><a href="scala_api_examples.html">Scala API</a></li>
<li><a href="examples.html">Bundled Examples</a></li>
<li><a href="example_connectors.html">Connecting to other systems</a></li>
</ul>
</li>
......
---
title: "Java API Examples"
title: "Bundled Examples"
---
* This will be replaced by the TOC
......@@ -7,13 +7,17 @@ title: "Java API Examples"
The following example programs showcase different applications of Flink
from simple word counting to graph algorithms. The code samples illustrate the
use of [Flink's Java API](java_api_guide.html).
use of [Flink's API](programming_guide.html).
The full source code of the following and more examples can be found in the __flink-java-examples__ module.
The full source code of the following and more examples can be found in the __flink-java-examples__
or __flink-scala-examples__ module.
## Word Count
WordCount is the "Hello World" of Big Data processing systems. It computes the frequency of words in a text collection. The algorithm works in two steps: First, the texts are splits the text to individual words. Second, the words are grouped and counted.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
~~~java
// get input data
DataSet<String> text = getTextDataSet(env);
......@@ -47,12 +51,38 @@ public static final class Tokenizer extends FlatMapFunction<String, Tuple2<Strin
The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/wordcount/WordCount.java "WordCount example" %} implements the above described algorithm with input parameters: `<text input path>, <output path>`. As test data, any text file will do.
</div>
<div data-lang="scala" markdown="1">
~~~scala
val env = ExecutionEnvironment.getExecutionEnvironment
// get input data
val text = getTextDataSet(env)
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
counts.writeAsCsv(outputPath, "\n", " ")
~~~
The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala "WordCount example" %} implements the above described algorithm with input parameters: `<text input path>, <output path>`. As test data, any text file will do.
</div>
</div>
## Page Rank
The PageRank algorithm computes the "importance" of pages in a graph defined by links, which point from one pages to another page. It is an iterative graph algorithm, which means that it repeatedly applies the same computation. In each iteration, each page distributes its current rank over all its neighbors, and compute its new rank as a taxed sum of the ranks it received from its neighbors. The PageRank algorithm was popularized by the Google search engine which uses the importance of webpages to rank the results of search queries.
In this simple example, PageRank is implemented with a [bulk iteration](java_api_guide.html#iterations) and a fixed number of iterations.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
~~~java
// get input data
DataSet<Tuple2<Long, Double>> pagesWithRanks = getPagesWithRanksDataSet(env);
......@@ -124,6 +154,73 @@ public static final class EpsilonFilter
The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java "PageRank program" %} implements the above example.
It requires the following parameters to run: `<pages input path>, <links input path>, <output path>, <num pages>, <num iterations>`.
</div>
<div data-lang="scala" markdown="1">
~~~scala
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// read input data
val pages = getPagesDataSet(env)
val links = getLinksDataSet(env)
// assign initial ranks to pages
val pagesWithRanks = pages.map(p => Page(p, 1.0 / numPages))
// build adjacency list from link input
val adjacencyLists = links
// initialize lists
.map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
// concatenate lists
.groupBy("sourceId").reduce {
(l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
}
// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
currentRanks =>
val newRanks = currentRanks
// distribute ranks to target pages
.join(adjacencyLists).where("pageId").equalTo("sourceId") {
(page, adjacent, out: Collector[Page]) =>
for (targetId <- adjacent.targetIds) {
out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
}
}
// collect ranks and sum them up
.groupBy("pageId").aggregate(SUM, "rank")
// apply dampening factor
.map { p =>
Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages))
}
// terminate if no rank update was significant
val termination = currentRanks.join(newRanks).where("pageId").equalTo("pageId") {
(current, next, out: Collector[Int]) =>
// check for significant update
if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
}
(newRanks, termination)
}
val result = finalRanks
// emit result
result.writeAsCsv(outputPath, "\n", " ")
// User-defined types
case class Link(sourceId: Long, targetId: Long)
case class Page(pageId: Long, rank: Double)
case class AdjacencyList(sourceId: Long, targetIds: Array[Long])
~~~
he {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala "PageRank program" %} implements the above example.
It requires the following parameters to run: `<pages input path>, <links input path>, <output path>, <num pages>, <num iterations>`.
</div>
</div>
Input files are plain text files and must be formatted as follows:
- Pages represented as an (long) ID separated by new-line characters.
* For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63.
......@@ -138,6 +235,9 @@ The Connected Components algorithm identifies parts of a larger graph which are
This implementation uses a [delta iteration](iterations.html): Vertices that have not changed their component ID do not participate in the next step. This yields much better performance, because the later iterations typically deal only with a few outlier vertices.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
~~~java
// read vertex and edge data
DataSet<Long> vertices = getVertexDataSet(env);
......@@ -214,6 +314,51 @@ public static final class ComponentIdFilter
The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/ConnectedComponents.java "ConnectedComponents program" %} implements the above example. It requires the following parameters to run: `<vertex input path>, <edge input path>, <output path> <max num iterations>`.
</div>
<div data-lang="scala" markdown="1">
~~~scala
// set up execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
// read vertex and edge data
// assign the initial components (equal to the vertex id)
val vertices = getVerticesDataSet(env).map { id => (id, id) }
// undirected edges by emitting for each input edge the input edges itself and an inverted
// version
val edges = getEdgesDataSet(env).flatMap { edge => Seq(edge, (edge._2, edge._1)) }
// open a delta iteration
val verticesWithComponents = vertices.iterateDelta(vertices, maxIterations, Array(0)) {
(s, ws) =>
// apply the step logic: join with the edges
val allNeighbors = ws.join(edges).where(0).equalTo(0) { (vertex, edge) =>
(edge._2, vertex._2)
}
// select the minimum neighbor
val minNeighbors = allNeighbors.groupBy(0).min(1)
// update if the component of the candidate is smaller
val updatedComponents = minNeighbors.join(s).where(0).equalTo(0) {
(newVertex, oldVertex, out: Collector[(Long, Long)]) =>
if (newVertex._2 < oldVertex._2) out.collect(newVertex)
}
// delta and new workset are identical
(updatedComponents, updatedComponents)
}
verticesWithComponents.writeAsCsv(outputPath, "\n", " ")
~~~
The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala "ConnectedComponents program" %} implements the above example. It requires the following parameters to run: `<vertex input path>, <edge input path>, <output path> <max num iterations>`.
</div>
</div>
Input files are plain text files and must be formatted as follows:
- Vertices represented as IDs and separated by new-line characters.
* For example `"1\n2\n12\n42\n63\n"` gives five vertices with (1), (2), (12), (42), and (63).
......@@ -236,7 +381,10 @@ WHERE l_orderkey = o_orderkey
GROUP BY l_orderkey, o_shippriority;
~~~
The Flink Java program, which implements the above query looks as follows.
The Flink program, which implements the above query looks as follows.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
~~~java
// get orders data set: (orderkey, orderstatus, orderdate, orderpriority, shippriority)
......@@ -285,6 +433,15 @@ priceSums.writeAsCsv(outputPath);
The {% gh_link /flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/relational/RelationalQuery.java "Relational Query program" %} implements the above query. It requires the following parameters to run: `<orders input path>, <lineitem input path>, <output path>`.
</div>
<div data-lang="scala" markdown="1">
Coming soon...
The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/RelationalQuery.scala "Relational Query program" %} implements the above query. It requires the following parameters to run: `<orders input path>, <lineitem input path>, <output path>`.
</div>
</div>
The orders and lineitem files can be generated using the [TPC-H benchmark](http://www.tpc.org/tpch/) suite's data generator tool (DBGEN).
Take the following steps to generate arbitrary large input files for the provided Flink programs:
......
---
title: "Scala API Examples"
---
The following example programs showcase different applications of Flink from simple word counting to graph algorithms.
The code samples illustrate the use of [Flink's Scala API](scala_api_guide.html).
The full source code of the following and more examples can be found in the [flink-scala-examples](https://github.com/apache/incubator-flink/tree/master/flink-examples/flink-scala-examples) module.
# Word Count
WordCount is the "Hello World" of Big Data processing systems. It computes the frequency of words in a text collection. The algorithm works in two steps: First, the texts are splits the text to individual words. Second, the words are grouped and counted.
```scala
// read input data
val input = TextFile(textInput)
// tokenize words
val words = input.flatMap { _.split(" ") map { (_, 1) } }
// count by word
val counts = words.groupBy { case (word, _) => word }
.reduce { (w1, w2) => (w1._1, w1._2 + w2._2) }
val output = counts.write(wordsOutput, CsvOutputFormat()))
```
The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/wordcount/WordCount.scala "WordCount example" %} implements the above described algorithm with input parameters: `<degree of parallelism>, <text input path>, <output path>`. As test data, any text file will do.
# Page Rank
The PageRank algorithm computes the "importance" of pages in a graph defined by links, which point from one pages to another page. It is an iterative graph algorithm, which means that it repeatedly applies the same computation. In each iteration, each page distributes its current rank over all its neighbors, and compute its new rank as a taxed sum of the ranks it received from its neighbors. The PageRank algorithm was popularized by the Google search engine which uses the importance of webpages to rank the results of search queries.
In this simple example, PageRank is implemented with a [bulk iteration](java_api_guide.html#iterations) and a fixed number of iterations.
```scala
// cases classes so we have named fields
case class PageWithRank(pageId: Long, rank: Double)
case class Edge(from: Long, to: Long, transitionProbability: Double)
// constants for the page rank formula
val dampening = 0.85
val randomJump = (1.0 - dampening) / NUM_VERTICES
val initialRank = 1.0 / NUM_VERTICES
// read inputs
val pages = DataSource(verticesPath, CsvInputFormat[Long]())
val edges = DataSource(edgesPath, CsvInputFormat[Edge]())
// assign initial rank
val pagesWithRank = pages map { p => PageWithRank(p, initialRank) }
// the iterative computation
def computeRank(ranks: DataSet[PageWithRank]) = {
// send rank to neighbors
val ranksForNeighbors = ranks join edges
where { _.pageId } isEqualTo { _.from }
map { (p, e) => (e.to, p.rank * e.transitionProbability) }
// gather ranks per vertex and apply page rank formula
ranksForNeighbors .groupBy { case (node, rank) => node }
.reduce { (a, b) => (a._1, a._2 + b._2) }
.map {case (node, rank) => PageWithRank(node, rank * dampening + randomJump) }
}
// invoke iteratively
val finalRanks = pagesWithRank.iterate(numIterations, computeRank)
val output = finalRanks.write(outputPath, CsvOutputFormat())
```
The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRank.scala "PageRank program" %} implements the above example.
It requires the following parameters to run: `<pages input path>, <link input path>, <output path>, <num pages>, <num iterations>`.
Input files are plain text files and must be formatted as follows:
- Pages represented as an (long) ID separated by new-line characters.
* For example `"1\n2\n12\n42\n63\n"` gives five pages with IDs 1, 2, 12, 42, and 63.
- Links are represented as pairs of page IDs which are separated by space characters. Links are separated by new-line characters:
* For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (directed) links (1)->(2), (2)->(12), (1)->(12), and (42)->(63).
For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).
# Connected Components
The Connected Components algorithm identifies parts of a larger graph which are connected by assigning all vertices in the same connected part the same component ID. Similar to PageRank, Connected Components is an iterative algorithm. In each step, each vertex propagates its current component ID to all its neighbors. A vertex accepts the component ID from a neighbor, if it is smaller than its own component ID.
This implementation uses a [delta iteration](iterations.html): Vertices that have not changed their component id do not participate in the next step. This yields much better performance, because the later iterations typically deal only with a few outlier vertices.
```scala
// define case classes
case class VertexWithComponent(vertex: Long, componentId: Long)
case class Edge(from: Long, to: Long)
// get input data
val vertices = DataSource(verticesPath, CsvInputFormat[Long]())
val directedEdges = DataSource(edgesPath, CsvInputFormat[Edge]())
// assign each vertex its own ID as component ID
val initialComponents = vertices map { v => VertexWithComponent(v, v) }
val undirectedEdges = directedEdges flatMap { e => Seq(e, Edge(e.to, e.from)) }
def propagateComponent(s: DataSet[VertexWithComponent], ws: DataSet[VertexWithComponent]) = {
val allNeighbors = ws join undirectedEdges
where { _.vertex } isEqualTo { _.from }
map { (v, e) => VertexWithComponent(e.to, v.componentId ) }
val minNeighbors = allNeighbors groupBy { _.vertex } reduceGroup { cs => cs minBy { _.componentId } }
// updated solution elements == new workset
val s1 = s join minNeighbors
where { _.vertex } isEqualTo { _.vertex }
flatMap { (curr, candidate) =>
if (candidate.componentId < curr.componentId) Some(candidate) else None
}
(s1, s1)
}
val components = initialComponents.iterateWithDelta(initialComponents, { _.vertex }, propagateComponent,
maxIterations)
val output = components.write(componentsOutput, CsvOutputFormat())
```
The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ConnectedComponents.scala "ConnectedComponents program" %} implements the above example. It requires the following parameters to run: `<vertex input path>, <edge input path>, <output path> <max num iterations>`.
Input files are plain text files and must be formatted as follows:
- Vertices represented as IDs and separated by new-line characters.
* For example `"1\n2\n12\n42\n63\n"` gives five vertices with (1), (2), (12), (42), and (63).
- Edges are represented as pairs for vertex IDs which are separated by space characters. Edges are separated by new-line characters:
* For example `"1 2\n2 12\n1 12\n42 63\n"` gives four (undirected) links (1)-(2), (2)-(12), (1)-(12), and (42)-(63).
# Relational Query
The Relational Query example assumes two tables, one with `orders` and the other with `lineitems` as specified by the [TPC-H decision support benchmark](http://www.tpc.org/tpch/). TPC-H is a standard benchmark in the database industry. See below for instructions how to generate the input data.
The example implements the following SQL query.
```sql
SELECT l_orderkey, o_shippriority, sum(l_extendedprice) as revenue
FROM orders, lineitem
WHERE l_orderkey = o_orderkey
AND o_orderstatus = "F"
AND YEAR(o_orderdate) > 1993
AND o_orderpriority LIKE "5%"
GROUP BY l_orderkey, o_shippriority;
```
The Flink Scala program, which implements the above query looks as follows.
```scala
// --- define some custom classes to address fields by name ---
case class Order(orderId: Int, status: Char, date: String, orderPriority: String, shipPriority: Int)
case class LineItem(orderId: Int, extendedPrice: Double)
case class PrioritizedOrder(orderId: Int, shipPriority: Int, revenue: Double)
val orders = DataSource(ordersInputPath, DelimitedInputFormat(parseOrder))
val lineItem2600s = DataSource(lineItemsInput, DelimitedInputFormat(parseLineItem))
val filteredOrders = orders filter { o => o.status == "F" && o.date.substring(0, 4).toInt > 1993 && o.orderPriority.startsWith("5") }
val prioritizedItems = filteredOrders join lineItems
where { _.orderId } isEqualTo { _.orderId } // join on the orderIds
map { (o, li) => PrioritizedOrder(o.orderId, o.shipPriority, li.extendedPrice) }
val prioritizedOrders = prioritizedItems
groupBy { pi => (pi.orderId, pi.shipPriority) }
reduce { (po1, po2) => po1.copy(revenue = po1.revenue + po2.revenue) }
val output = prioritizedOrders.write(ordersOutput, CsvOutputFormat(formatOutput))
```
The {% gh_link /flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/relational/RelationalQuery.scala "Relational Query program" %} implements the above query. It requires the following parameters to run: `<orders input path>, <lineitem input path>, <output path>, <degree of parallelism>`.
The orders and lineitem files can be generated using the [TPC-H benchmark](http://www.tpc.org/tpch/) suite's data generator tool (DBGEN).
Take the following steps to generate arbitrary large input files for the provided Flink programs:
1. Download and unpack DBGEN
2. Make a copy of *makefile.suite* called *Makefile* and perform the following changes:
```bash
DATABASE = DB2
MACHINE = LINUX
WORKLOAD = TPCH
CC = gcc
```
1. Build DBGEN using *make*
2. Generate lineitem and orders relations using dbgen. A scale factor
(-s) of 1 results in a generated data set with about 1 GB size.
```bash
./dbgen -T o -s 1
```
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册