提交 9d1c49fb 编写于 作者: S Stephan Ewen

[Docs] Adjust Java API documentation to be in sync with latest changes

[ci skip]
上级 bcf85c2a
......@@ -5,7 +5,7 @@
# {{ site.CONFIG_KEY }}
#------------------------------------------------------------------------------
FLINK_VERSION_STABLE: 0.6-SNAPSHOT # this variable can point to a SNAPSHOT version in the git source.
FLINK_VERSION_STABLE: 0.6-incubating-SNAPSHOT # this variable can point to a SNAPSHOT version in the git source.
FLINK_VERSION_SHORT: 0.6
FLINK_ISSUES_URL: https://issues.apache.org/jira/browse/FLINK
FLINK_GITHUB_URL: https://github.com/apache/incubator-flink
......
---
title: "Java API Programming Guide"
---
<section id="top"></section>
Java API
========
<section id="introduction">
<section id="top">
Introduction
------------
......@@ -48,7 +45,7 @@ public class WordCountExample {
env.execute("Word Count Example");
}
public static final class LineSplitter extends FlatMapFunction<String, Tuple2<String, Integer>> {
public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
for (String word : line.split(" ")) {
......@@ -67,7 +64,7 @@ Linking with Flink
To write programs with Flink, you need to include Flink’s Java API library in your project.
The simplest way to do this is to use the [quickstart scripts](java_api_quickstart.html). They create a blank project from a template (a Maven Archetype), which sets up everything for you. To manually create the project, you can use the archetype and create a project by calling:
The simplest way to do this is to use the [quickstart scripts]({{site.baseurl}}/java_api_quickstart.html). They create a blank project from a template (a Maven Archetype), which sets up everything for you. To manually create the project, you can use the archetype and create a project by calling:
```bash
mvn archetype:generate /
......@@ -91,10 +88,13 @@ If you want to add Flink to an existing Maven project, add the following entry t
</dependency>
```
In order to link against the latest SNAPSHOT versions of the code, please follow [this guide]({{site.baseurl}}/downloads.html/#nightly).
If you are using Flink together with Hadoop, the version of the dependency may vary depending on the version of Hadoop (or more specifically, HDFS) that you want to use Flink with.
Please refer to the [downloads page]({{site.baseurl}}/downloads.html) for a list of available versions, and instructions on how to link with custom versions of Hadoop.
In order to link against the latest SNAPSHOT versions of the code, please follow [this guide]({{site.baseurl}}/downloads.html#nightly).
The *flink-clients* dependency is only necessary to invoke the Flink program locally (for example to run it standalone for testing and debugging).
If you intend to only export the program as a JAR file and [run it on a cluster](cluster_execution.html), you can skip that dependency.
If you intend to only export the program as a JAR file and [run it on a cluster]({{site.baseurl}}/cluster_execution.html), you can skip that dependency.
[Back to top](#top)
......@@ -112,7 +112,7 @@ programs with a `main()` method. Each program consists of the same basic parts:
5. Execute your program.
We will now give an overview of each of those steps but please refer
to the respective sections for more details. Note that all {% gh_link /flink-java/src/main/java/org/apache/flink/api/java "core classes of the Java API" %} are found in the package `org.apache.flinkapi.java`.
to the respective sections for more details. Note that all {% gh_link /flink-java/src/main/java/org/apache/flink/api/java "core classes of the Java API" %} are found in the package `org.apache.flink.api.java`.
The `ExecutionEnvironment` is the basis for all Flink programs. You can
obtain one using these static methods on class `ExecutionEnvironment`:
......@@ -131,8 +131,8 @@ Typically, you only need to use `getExecutionEnvironment()`, since this
will do the right thing depending on the context: if you are executing
your program inside an IDE or as a regular Java program it will create
a local environment that will execute your program on your local machine. If
you created a JAR file from you program, and invoke it through the [command line](cli.html)
or the [web interface](web_client.html),
you created a JAR file from you program, and invoke it through the [command line]({{site.baseurl}}/cli.html)
or the [web interface]({{site.baseurl}}/web_client.html),
the Flink cluster manager will
execute your main method and `getExecutionEnvironment()` will return
an execution environment for executing your program on a cluster.
......@@ -202,6 +202,7 @@ machine or submit your program for execution on a cluster, depending on
how you created the execution environment.
[Back to top](#top)
<section id="lazyeval">
Lazy Evaluation
---------------
......@@ -209,6 +210,234 @@ Lazy Evaluation
All Flink programs are executed lazily: When the program's main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to the program's plan. The operations are actually executed when one of the `execute()` methods is invoked on the ExecutionEnvironment object. Whether the program is executed locally or on a cluster depends on the environment of the program.
The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.
[Back to top](#top)
<section id="transformations">
Transformations
---------------
Data transformations transform one or more DataSets into a new DataSet. Programs can combine multiple transformations into
sophisticated assemblies.
This section gives a brief overview of the available transformations. The [transformations documentation]({{site.baseurl}}/java_api_transformations.html)
has full description of all transformations with examples.
<table class="table table-bordered">
<thead>
<tr>
<th class="text-center" style="width: 20%">Transformation</th>
<th class="text-center">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Map</strong></td>
<td>
<p>Takes one element and produces one element.</p>
{% highlight java %}
data.map(new MapFunction<String, Integer>() {
public Integer map(String value) { return Integer.parseInt(value); }
});
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>FlatMap</strong></td>
<td>
<p>Takes one element and produces zero, one, or more elements.</p>
{% highlight java %}
data.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String value, Collector<String> out) {
for (String s : value.split(" ")) {
out.collect(s);
}
}
});
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>Filter</strong></td>
<td>
<p>Evaluates a boolean function for each element and retains those for which the function returns *true*.</p>
{% highlight java %}
data.filter(new FilterFunction<Integer>() {
public boolean filter(Integer value) { return value > 1000; }
});
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>Reduce</strong></td>
<td>
<p>Combines a group of elements into a single element by repeatedly combining two elements into one.
Reduce may be applied on a full data set, or on a grouped data set.</p>
{% highlight java %}
data.reduce(new ReduceFunction<Integer> {
public Integer reduce(Integer a, Integer b) { return a + b; }
});
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>ReduceGroup</strong></td>
<td>
<p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a full data set, or on a grouped data set.</p>
{% highlight java %}
data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
public void reduceGroup(Iterable<Integer> values, Collector<Integer> out) {
int prefixSum = 0;
for (Integer i : values) {
prefixSum += i;
out.collect(prefixSum);
}
}
});
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>Aggregate</strong></td>
<td>
<p>Aggregates a group of values into a single value. Aggregation functions can be thought of as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped data set.</p>
{% highlight java %}
DataSet<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>ReduceGroup</strong></td>
<td>
<p>Combines a group of elements into one or more elements. ReduceGroup may be applied on a full data set, or on a grouped data set.</p>
{% highlight java %}
data.reduceGroup(new GroupReduceFunction<Integer, Integer> {
public void reduceGroup(Iterable<Integer> values, Collector<Integer> out) {
int prefixSum = 0;
for (Integer i : values) {
prefixSum += i;
out.collect(prefixSum);
}
}
});
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>Aggregate</strong></td>
<td>
<p>Aggregates a group of values into a single value. Aggregation functions can be thought of as built-in reduce functions. Aggregate may be applied on a full data set, or on a grouped data set.</p>
{% highlight java %}
DataSet<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input.aggregate(SUM, 0).and(MIN, 2);
{% endhighlight %}
</td>
</tr>
</tr>
<td><strong>Join</strong></td>
<td>
<p>Joins two data sets by creating all pairs of elements that are equal on their keys. Optionally uses a JoinFunction to turn the pair of elements into a single element. See [keys](#keys) on how to define join keys.</p>
{% highlight java %}
result = input1.join(input2)
.where(0) // key of the first input (tuple field 0)
.equalTo(1); // key of the second input (tuple field 1)
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>CoGroup</strong></td>
<td>
<p>The two-dimensional variant of the reduce operation. Groups each input on one or more fields and then joins the groups. The transformation function is called per pair of groups. See [keys](#keys) on how to define coGroup keys.</p>
{% highlight java %}
data1.coGroup(data2)
.where(0)
.equalTo(1)
.with(new CoGroupFunction<String, String, String>() {
public void coGroup(Iterable<String> in1, Iterable<String> in2, Collector<String> out) {
out.collect(...);
}
});
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>Cross</strong></td>
<td>
<p>Builds the cartesian product (cross product) of two inputs, creating all pairs of elements. Optionally uses a CrossFunction to turn the pair of elements into a single element</p>
{% highlight java %}
DataSet<Integer> data1 = // [...]
DataSet<String> data2 = // [...]
DataSet<Tuple2<Integer, String>> result = data1.cross(data2);
{% endhighlight %}
</td>
</tr>
<tr>
<td><strong>Union</strong></td>
<td>
<p>Produces the union of two data sets. This operation happens implicitly if more than one data set is used for a specific function input.</p>
{% highlight java %}
DataSet<String> data1 = // [...]
DataSet<String> data2 = // [...]
DataSet<String> result = data1.union(data2);
{% endhighlight %}
</td>
</tr>
</tbody>
</table>
----------
The following transformations are available on data sets of Tuples:
<table class="table table-bordered">
<thead>
<tr>
<th class="text-center" stype="width: 20%">Transformation</th>
<th class="text-center">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><strong>Project</strong></td>
<td>
<p>Selects a subset of fields from the tuples</p>
{% highlight java %}
DataSet<Tuple3<Integer, Double, String>> in = // [...]
DataSet<Tuple2<String, Integer>> out = in.project(2,0).types(String.class, Integer.class);
{% endhighlight %}
</td>
</tr>
</tbody>
</table>
[Back to top](#top)
<section id="keys">
Defining Keys
-------------
[Back to top](#top)
<section id="functions">
Functions
---------
[Back to top](#top)
<section id="types">
Data Types
......@@ -248,7 +477,6 @@ public class WordWithCount {
You can use all of those types to parameterize `DataSet` and function implementations, e.g. `DataSet<String>` for a `String` data set or `MapFunction<String, Integer>` for a mapper from `String` to `Integer`.
```java
// using a basic data type
DataSet<String> numbers = env.fromElements("1", "2");
......@@ -341,652 +569,6 @@ The {% gh_link /flink-java/src/main/java/org/apache/flink/api/java/typeutils/Res
[Back to top](#top)
<section id="transformations">
Data Transformations
--------------------
A data transformation transforms one or more `DataSet`s into a new `DataSet`. Advanced data analysis programs can be assembled by chaining multiple transformations.
### Map
The Map transformation applies a user-defined `MapFunction` on each element of a DataSet.
It implements a one-to-one mapping, that is, exactly one element must be returned by
the function.
The following code transforms a `DataSet` of Integer pairs into a `DataSet` of Integers:
```java
// MapFunction that adds two integer values
public class IntAdder extends MapFunction<Tuple2<Integer, Integer>, Integer> {
@Override
public Integer map(Tuple2<Integer, Integer> in) {
return in.f0 + in.f1;
}
}
// [...]
DataSet<Tuple2<Integer, Integer>> intPairs = // [...]
DataSet<Integer> intSums = intPairs.map(new IntAdder());
```
### FlatMap
The FlatMap transformation applies a user-defined `FlatMapFunction` on each element of a `DataSet`.
This variant of a map function can return arbitrary many result elements (including none) for each input element.
The following code transforms a `DataSet` of text lines into a `DataSet` of words:
```java
// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
public class Tokenizer extends FlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector<String> out) {
for (String token : value.split("\\W")) {
out.collect(token);
}
}
}
// [...]
DataSet<String> textLines = // [...]
DataSet<String> words = textLines.flatMap(new Tokenizer());
```
### Filter
The Filter transformation applies a user-defined `FilterFunction` on each element of a `DataSet` and retains only those elements for which the function returns `true`.
The following code removes all Integers smaller than zero from a `DataSet`:
```java
// FilterFunction that filters out all Integers smaller than zero.
public class NaturalNumberFilter extends FilterFunction<Integer> {
@Override
public boolean filter(Integer number) {
return number >= 0;
}
}
// [...]
DataSet<Integer> intNumbers = // [...]
DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter());
```
### Project (Tuple DataSets only)
The Project transformation removes or moves `Tuple` fields of a `Tuple` `DataSet`.
The `project(int...)` method selects `Tuple` fields that should be retained by their index and defines their order in the output `Tuple`.
The `types(Class<?> ...)`method must give the types of the output `Tuple` fields.
Projections do not require the definition of a user function.
The following code shows different ways to apply a Project transformation on a `DataSet`:
```java
DataSet<Tuple3<Integer, Double, String>> in = // [...]
// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
DataSet<Tuple2<String, Integer>> out = in.project(2,0).types(String.class, Integer.class);
```
### Transformations on grouped DataSet
The reduce operations can operate on grouped data sets. Specifying the key to
be used for grouping can be done in two ways:
- a `KeySelector` function or
- one or more field position keys (`Tuple` `DataSet` only).
Please look at the reduce examples to see how the grouping keys are specified.
### Reduce on grouped DataSet
A Reduce transformation that is applied on a grouped `DataSet` reduces each group to a single element using a user-defined `ReduceFunction`.
For each group of input elements, a `ReduceFunction` successively combines pairs of elements into one element until only a single element for each group remains.
#### Reduce on DataSet grouped by KeySelector Function
A `KeySelector` function extracts a key value from each element of a `DataSet`. The extracted key value is used to group the `DataSet`.
The following code shows how to group a POJO `DataSet` using a `KeySelector` function and to reduce it with a `ReduceFunction`.
```java
// some ordinary POJO
public class WC {
public String word;
public int count;
// [...]
}
// ReduceFunction that sums Integer attributes of a POJO
public class WordCounter extends ReduceFunction<WC> {
@Override
public WC reduce(WC in1, WC in2) {
return new WC(in1.word, in1.count + in2.count);
}
}
// [...]
DataSet<WC> words = // [...]
DataSet<WC> wordCounts = words
// DataSet grouping with inline-defined KeySelector function
.groupBy(
new KeySelector<WC, String>() {
public String getKey(WC wc) { return wc.word; }
})
// apply ReduceFunction on grouped DataSet
.reduce(new WordCounter());
```
#### Reduce on DataSet grouped by Field Position Keys (Tuple DataSets only)
Field position keys specify one or more fields of a `Tuple` `DataSet` that are used as grouping keys.
The following code shows how to use field position keys and apply a `ReduceFunction`.
```java
DataSet<Tuple3<String, Integer, Double>> tuples = // [...]
DataSet<Tuple3<String, Integer, Double>> reducedTuples =
tuples
// group DataSet on first and second field of Tuple
.groupBy(0,1)
// apply ReduceFunction on grouped DataSet
.reduce(new MyTupleReducer());
```
### GroupReduce on grouped DataSet
A GroupReduce transformation that is applied on a grouped `DataSet` calls a user-defined `GroupReduceFunction` for each group. The difference
between this and `Reduce` is that the user defined function gets the whole group at once.
The function is invoked with an iterator over all elements of a group and can return an arbitrary number of result elements using the collector.
#### GroupReduce on DataSet grouped by Field Position Keys (Tuple DataSets only)
The following code shows how duplicate strings can be removed from a `DataSet` grouped by Integer.
```java
public class DistinctReduce
extends GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
// Set to hold all unique strings of a group
Set<String> uniqStrings = new HashSet<String>();
@Override
public void reduce(Iterator<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
// clear set
uniqStrings.clear();
// there is at least one element in the iterator
Tuple2<Integer, String> first = in.next();
Integer key = first.f0;
uniqStrings.add(first.f1);
// add all strings of the group to the set
while(in.hasNext()) {
uniqStrings.add(in.next().f1);
}
// emit all unique strings
Tuple2<Integer, String> t = new Tuple2<Integer, String>(key, "");
for(String s : uniqStrings) {
t.f1 = s;
out.collect(t);
}
}
}
// [...]
DataSet<Tuple2<Integer, String>> input = // [...]
DataSet<Tuple2<Integer, String>> output =
input
// group DataSet by the first tuple field
.groupBy(0)
// apply GroupReduceFunction on each group and
// remove elements with duplicate strings.
.reduceGroup(new DistinctReduce());
```
**Note:** Flink internally works a lot with mutable objects. Collecting objects like in the above example only works because Strings are immutable in Java!
#### GroupReduce on DataSet grouped by KeySelector Function
Works analogous to `KeySelector` functions in Reduce transformations.
#### GroupReduce on sorted groups (Tuple DataSets only)
A `GroupReduceFunction` accesses the elements of a group using an iterator. Optionally, the iterator can hand out the elements of a group in a specified order. In many cases this can help to reduce the complexity of a user-defined `GroupReduceFunction` and improve its efficiency.
Right now, this feature is only available for `Tuple` `DataSet`.
The following code shows another example how to remove duplicate Strings in a `DataSet` grouped by an Integer and sorted by String.
```java
// GroupReduceFunction that removes consecutive identical elements
public class DistinctReduce
extends GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
@Override
public void reduce(Iterator<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
// there is at least one element in the iterator
Tuple2<Integer, String> first = in.next();
Integer key = first.f0;
String comp = first.f1;
// for each element in group
while(in.hasNext()) {
String next = in.next().f1;
// check if strings are different
if(!next.equals(comp)) {
// emit a new element
out.collect(new Tuple2<Integer, String>(key, comp));
// update compare string
comp = next;
}
}
// emit last element
out.collect(new Tuple2<Integer, String>(key, comp));
}
}
// [...]
DataSet<Tuple2<Integer, String>> input = // [...]
DataSet<Double> output = input
// group DataSet by the first tuple field
.groupBy(0)
// sort groups on second tuple field
.sortGroup(1, Order.ASCENDING)
// // apply GroupReduceFunction on DataSet with sorted groups
.reduceGroup(new DistinctReduce());
```
**Note:** A GroupSort often comes for free if the grouping is established using a sort-based execution strategy of an operator before the reduce operation.
#### Combinable GroupReduceFunctions
In contrast to a `ReduceFunction`, a `GroupReduceFunction` is not implicitly combinable. In order to make a `GroupReduceFunction` combinable, you need to implement (override) the ```combine()``` method and annotate the `GroupReduceFunction` with the ```@Combinable``` annotation as shown here:
The following code shows how to compute multiple sums using a combinable `GroupReduceFunction`:
```java
// Combinable GroupReduceFunction that computes two sums.
@Combinable
public class MyCombinableGroupReducer
extends GroupReduceFunction<Tuple3<String, Integer, Double>,
Tuple3<String, Integer, Double>> {
@Override
public void reduce(Iterator<Tuple3<String, Integer, Double>> in,
Collector<Tuple3<String, Integer, Double>> out) {
// one element is always present in iterator
Tuple3<String, Integer, Double> curr = in.next();
String key = curr.f0;
int intSum = curr.f1;
double doubleSum = curr.f2;
// sum up all ints and doubles
while(in.hasNext()) {
curr = in.next();
intSum += curr.f1;
doubleSum += curr.f2;
}
// emit a tuple with both sums
out.collect(new Tuple3<String, Integer, Double>(key, intSum, doubleSum));
}
@Override
public void combine(Iterator<Tuple3<String, Integer, Double>> in,
Collector<Tuple3<String, Integer, Double>> out)) {
// in some cases combine() calls can simply be forwarded to reduce().
this.reduce(in, out);
}
}
```
### Aggregate on grouped Tuple DataSet
There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
- Sum,
- Min, and
- Max.
The Aggregate transformation can only be applied on a `Tuple` `DataSet` and supports only field positions keys for grouping.
The following code shows how to apply an Aggregation transformation on a `DataSet` grouped by field position keys:
```java
DataSet<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input
// group DataSet on second field
.groupBy(1)
// compute sum of the first field
.aggregate(SUM, 0)
// compute minimum of the third field
.and(MIN, 2);
```
To apply multiple aggregations on a DataSet it is necessary to use the `.and()` function after the first aggregate, that means `.aggregate(SUM, 0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the original DataSet.
In contrast to that `.aggregate(SUM, 0).aggregate(MIN, 2)` will apply an aggregation on an aggregation. In the given example it would produce the minimum of field 2 after calculating the sum of field 0 grouped by field 1.
**Note:** The set of aggregation functions will be extended in the future.
### Reduce on full DataSet
The Reduce transformation applies a user-defined `ReduceFunction` to all elements of a `DataSet`.
The `ReduceFunction` subsequently combines pairs of elements into one element until only a single element remains.
The following code shows how to sum all elements of an Integer `DataSet`:
```java
// ReduceFunction that sums Integers
public class IntSummer extends ReduceFunction<Integer> {
@Override
public Integer reduce(Integer num1, Integer num2) {
return num1 + num2;
}
}
// [...]
DataSet<Integer> intNumbers = // [...]
DataSet<Integer> sum = intNumbers.reduce(new IntSummer());
```
Reducing a full `DataSet` using the Reduce transformation implies that the final Reduce operation cannot be done in parallel. However, a `ReduceFunction` is automatically combinable such that a Reduce transformation does not limit scalability for most use cases.
### GroupReduce on full DataSet
The GroupReduce transformation applies a user-defined `GroupReduceFunction` on all elements of a `DataSet`.
A `GroupReduceFunction` can iterate over all elements of `DataSet` and return an arbitrary number of result elements.
The following example shows how to apply a GroupReduce transformation on a full `DataSet`:
```java
DataSet<Integer> input = // [...]
// apply a (preferably combinable) GroupReduceFunction to a DataSet
DataSet<Double> output = input.reduceGroup(new MyGroupReducer());
```
**Note:** A GroupReduce transformation on a full `DataSet` cannot be done in parallel if the `GroupReduceFunction` is not combinable. Therefore, this can be a very compute intensive operation. See the paragraph on "Combineable `GroupReduceFunction`s" above to learn how to implement a combinable `GroupReduceFunction`.
### Aggregate on full Tuple DataSet
There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
- Sum,
- Min, and
- Max.
The Aggregate transformation can only be applied on a `Tuple` `DataSet`.
The following code shows how to apply an Aggregation transformation on a full `DataSet`:
```java
DataSet<Tuple2<Integer, Double>> input = // [...]
DataSet<Tuple2<Integer, Double>> output = input
// compute sum of the first field
.aggregate(SUM, 0)
// compute minimum of the second field
.and(MIN, 1);
```
**Note:** Extending the set of supported aggregation functions is on our roadmap.
### Join
The Join transformation joins two `DataSet`s into one `DataSet`. The elements of both `DataSet`s are joined on one or more keys which can be specified using
- a `KeySelector` function or
- one or more field position keys (`Tuple` `DataSet` only).
There are a few different ways to perform a Join transformation which are shown in the following.
#### Default Join (Join into Tuple2)
The default Join transformation produces a new `Tuple``DataSet` with two fields. Each tuple holds a joined element of the first input `DataSet` in the first tuple field and a matching element of the second input `DataSet` in the second field.
The following code shows a default Join transformation using field position keys:
```java
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Double, Integer>> input2 = // [...]
// result dataset is typed as Tuple2
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Double, Integer>>>
result =
input1.join(input2)
// key definition on first DataSet using a field position key
.where(0)
// key definition of second DataSet using a field position key
.equalTo(1);
```
#### Join with JoinFunction
A Join transformation can also call a user-defined `JoinFunction` to process joining tuples.
A `JoinFunction` receives one element of the first input `DataSet` and one element of the second input `DataSet` and returns exactly one element.
The following code performs a join of `DataSet` with custom java objects and a `Tuple` `DataSet` using `KeySelector` functions and shows how to call a user-defined `JoinFunction`:
```java
// some POJO
public class Rating {
public String name;
public String category;
public int points;
}
// Join function that joins a custom POJO with a Tuple
public class PointWeighter
extends JoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
@Override
public Tuple2<String, Double> join(Rating rating, Tuple2<String, Double> weight) {
// multiply the points and rating and construct a new output tuple
return new Tuple2<String, Double>(rating.name, rating.points * weight.f1);
}
}
DataSet<Rating> ratings = // [...]
DataSet<Tuple2<String, Double>> weights = // [...]
DataSet<Tuple2<String, Double>>
weightedRatings =
ratings.join(weights)
// key definition of first DataSet using a KeySelector function
.where(new KeySelection<Rating, String>() {
public String getKey(Rating r) { return r.category; }
})
// key definition of second DataSet using a KeySelector function
.equalTo(new KeySelection<Tuple2<String, Double>, String>() {
public String getKey(Tuple2<String, Double> t) { return t.f0; }
})
// applying the JoinFunction on joining pairs
.with(new PointWeighter());
```
#### Join with Projection
A Join transformation can construct result tuples using a projection as shown here:
```java
DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, String, Double, Byte>
result =
input1.join(input2)
// key definition on first DataSet using a field position key
.where(0)
// key definition of second DataSet using a field position key
.equalTo(0)
// select and reorder fields of matching tuples
.projectFirst(0,2).projectSecond(1).projectFirst(1)
.types(Integer.class, String.class, Double.class, Byte.class);
```
`projectFirst(int...)` and `projectSecond(int...)` select the fields of the first and second joined input that should be assembled into an output `Tuple`. The order of indexes defines the order of fields in the output tuple.
The join projection works also for non-`Tuple` `DataSet`s. In this case, `projectFirst()` or `projectSecond()` must be called without arguments to add a joined element to the output `Tuple`.
#### Join with DataSet Size Hint
In order to guide the optimizer to pick the right execution strategy, you can hint the size of a `DataSet` to join as shown here:
```java
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result1 =
// hint that the second DataSet is very small
input1.joinWithTiny(input2)
.where(0)
.equalTo(0);
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result2 =
// hint that the second DataSet is very large
input1.joinWithHuge(input2)
.where(0)
.equalTo(0);
```
### Cross
The Cross transformation combines two `DataSet`s into one `DataSet`. It builds all pairwise combinations of the elements of both input `DataSet`s, i.e., it builds a Cartesian product.
The Cross transformation either calls a user-defined `CrossFunction` on each pair of elements or applies a projection. Both modes are shown in the following.
**Note:** Cross is potentially a *very* compute-intensive operation which can challenge even large compute clusters!
#### Cross with User-Defined Function
A Cross transformation can call a user-defined `CrossFunction`. A `CrossFunction` receives one element of the first input and one element of the second input and returns exactly one result element.
The following code shows how to apply a Cross transformation on two `DataSet`s using a `CrossFunction`:
```java
public class Coord {
public int id;
public int x;
public int y;
}
// CrossFunction computes the Euclidean distance between two Coord objects.
public class EuclideanDistComputer
extends CrossFunction<Coord, Coord, Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> cross(Coord c1, Coord c2) {
// compute Euclidean distance of coordinates
double dist = Math.sqrt(Math.pow(c1.x - c2.x, 2) + Math.pow(c1.y - c2.y, 2));
return new Tuple3<Integer, Integer, Double>(c1.id, c2.id, dist);
}
}
DataSet<Coord> coords1 = // [...]
DataSet<Coord> coords2 = // [...]
DataSet<Tuple3<Integer, Integer, Double>>
distances =
coords1.cross(coords2)
// apply CrossFunction
.with(new EuclideanDistComputer());
```
#### Cross with Projection
A Cross transformation can also construct result tuples using a projection as shown here:
```java
DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, Byte, Integer, Double>
result =
input1.cross(input2)
// select and reorder fields of matching tuples
.projectSecond(0).projectFirst(1,0).projectSecond(1)
.types(Integer.class, Byte.class, Integer.class, Double.class);
```
The field selection in a Cross projection works the same way as in the projection of Join results.
#### Cross with DataSet Size Hint
In order to guide the optimizer to pick the right execution strategy, you can hint the size of a `DataSet` to cross as shown here:
```java
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]
DataSet<Tuple4<Integer, String, Integer, String>>
udfResult =
// hint that the second DataSet is very small
input1.crossWithTiny(input2)
// apply any Cross function (or projection)
.with(new MyCrosser());
DataSet<Tuple3<Integer, Integer, String>>
projectResult =
// hint that the second DataSet is very large
input1.crossWithHuge(input2)
// apply a projection (or any Cross function)
.projectFirst(0,1).projectSecond(1).types(Integer.class, String.class, String.class)
```
### CoGroup
The CoGroup transformation jointly processes groups of two `DataSet`s. Both `DataSet`s are grouped on a defined key and groups of both `DataSet`s that share the same key are handed together to a user-defined `CoGroupFunction`. If for a specific key only one `DataSet` has a group, the `CoGroupFunction` is called with this group and an empty group.
A `CoGroupFunction` can separately iterate over the elements of both groups and return an arbitrary number of result elements.
Similar to Reduce, GroupReduce, and Join, keys can be defined using
- a `KeySelector` function or
- one or more field position keys (`Tuple` `DataSet` only).
#### CoGroup on DataSets grouped by Field Position Keys (Tuple DataSets only)
```java
// Some CoGroupFunction definition
class MyCoGrouper
extends CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Double> {
// set to hold unique Integer values
Set<Integer> ints = new HashSet<Integer>();
@Override
public void coGroup(Iterator<Tuple2<String, Integer>> iVals,
Iterator<Tuple2<String, Double>> dVals,
Collector<Double> out) {
// clear Integer set
ints.clear();
// add all Integer values in group to set
while(iVals.hasNext()) {
ints.add(iVals.next().f1);
}
// multiply each Double value with each unique Integer values of group
while(dVals.hasNext()) {
for(Integer i : ints) {
out.collect(dVals.next().f1 * i));
}
}
}
}
// [...]
DataSet<Tuple2<String, Integer>> iVals = // [...]
DataSet<Tuple2<String, Double>> dVals = // [...]
DataSet<Double> output = iVals.coGroup(dVals)
// group first DataSet on first tuple field
.where(0)
// group second DataSet on first tuple field
.equalTo(0)
// apply CoGroup function on each pair of groups
.with(new MyCoGrouper());
```
#### CoGroup on DataSets grouped by Key Selector Function
Works analogous to key selector functions in Join transformations.
### Union
Produces the union of two `DataSet`s, which have to be of the same type. A union of more than two `DataSet`s can be implemented with multiple union calls, as shown here:
```java
DataSet<Tuple2<String, Integer>> vals1 = // [...]
DataSet<Tuple2<String, Integer>> vals2 = // [...]
DataSet<Tuple2<String, Integer>> vals3 = // [...]
DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2)
.union(vals3);
```
[Back to top](#top)
<section id="data_sources">
Data Sources
------------
......@@ -1175,7 +757,7 @@ List<Tuple2<String, Integer>> outData = new ArrayList<Tuple2<String, Integer>>()
myResult.output(new LocalCollectionOutputFormat(outData));
```
**Note:** Collection data sources will only work correctly, if the whole program is executed in the same JVM!
**Note:** Currently, the collection data sink is restricted to local execution, as a debugging tool.
[Back to top](#top)
......@@ -1281,7 +863,7 @@ Semantic annotations can be attached to functions through Java annotations, or p
```java
@ConstantFields("1")
public class DivideFirstbyTwo extends MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
public class DivideFirstbyTwo implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
@Override
public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) {
value.f0 /= 2;
......
---
title: "Java API Transformations"
---
<section id="top">
DataSet Transformations
-----------------------
This document gives a deep-dive into the available transformations on DataSets. For a general introduction to the
Flink Java API, please refer to the [API guide]({{site.baseurl}}/java_api_guide.html)
### Map
The Map transformation applies a user-defined `MapFunction` on each element of a DataSet.
It implements a one-to-one mapping, that is, exactly one element must be returned by
the function.
The following code transforms a `DataSet` of Integer pairs into a `DataSet` of Integers:
```java
// MapFunction that adds two integer values
public class IntAdder implements MapFunction<Tuple2<Integer, Integer>, Integer> {
@Override
public Integer map(Tuple2<Integer, Integer> in) {
return in.f0 + in.f1;
}
}
// [...]
DataSet<Tuple2<Integer, Integer>> intPairs = // [...]
DataSet<Integer> intSums = intPairs.map(new IntAdder());
```
### FlatMap
The FlatMap transformation applies a user-defined `FlatMapFunction` on each element of a `DataSet`.
This variant of a map function can return arbitrary many result elements (including none) for each input element.
The following code transforms a `DataSet` of text lines into a `DataSet` of words:
```java
// FlatMapFunction that tokenizes a String by whitespace characters and emits all String tokens.
public class Tokenizer implements FlatMapFunction<String, String> {
@Override
public void flatMap(String value, Collector<String> out) {
for (String token : value.split("\\W")) {
out.collect(token);
}
}
}
// [...]
DataSet<String> textLines = // [...]
DataSet<String> words = textLines.flatMap(new Tokenizer());
```
### Filter
The Filter transformation applies a user-defined `FilterFunction` on each element of a `DataSet` and retains only those elements for which the function returns `true`.
The following code removes all Integers smaller than zero from a `DataSet`:
```java
// FilterFunction that filters out all Integers smaller than zero.
public class NaturalNumberFilter implements FilterFunction<Integer> {
@Override
public boolean filter(Integer number) {
return number >= 0;
}
}
// [...]
DataSet<Integer> intNumbers = // [...]
DataSet<Integer> naturalNumbers = intNumbers.filter(new NaturalNumberFilter());
```
### Project (Tuple DataSets only)
The Project transformation removes or moves `Tuple` fields of a `Tuple` `DataSet`.
The `project(int...)` method selects `Tuple` fields that should be retained by their index and defines their order in the output `Tuple`.
The `types(Class<?> ...)`method must give the types of the output `Tuple` fields.
Projections do not require the definition of a user function.
The following code shows different ways to apply a Project transformation on a `DataSet`:
```java
DataSet<Tuple3<Integer, Double, String>> in = // [...]
// converts Tuple3<Integer, Double, String> into Tuple2<String, Integer>
DataSet<Tuple2<String, Integer>> out = in.project(2,0).types(String.class, Integer.class);
```
### Transformations on grouped DataSet
The reduce operations can operate on grouped data sets. Specifying the key to
be used for grouping can be done in two ways:
- a `KeySelector` function or
- one or more field position keys (`Tuple` `DataSet` only).
Please look at the reduce examples to see how the grouping keys are specified.
### Reduce on grouped DataSet
A Reduce transformation that is applied on a grouped `DataSet` reduces each group to a single element using a user-defined `ReduceFunction`.
For each group of input elements, a `ReduceFunction` successively combines pairs of elements into one element until only a single element for each group remains.
#### Reduce on DataSet grouped by KeySelector Function
A `KeySelector` function extracts a key value from each element of a `DataSet`. The extracted key value is used to group the `DataSet`.
The following code shows how to group a POJO `DataSet` using a `KeySelector` function and to reduce it with a `ReduceFunction`.
```java
// some ordinary POJO
public class WC {
public String word;
public int count;
// [...]
}
// ReduceFunction that sums Integer attributes of a POJO
public class WordCounter implements ReduceFunction<WC> {
@Override
public WC reduce(WC in1, WC in2) {
return new WC(in1.word, in1.count + in2.count);
}
}
// [...]
DataSet<WC> words = // [...]
DataSet<WC> wordCounts = words
// DataSet grouping with inline-defined KeySelector function
.groupBy(
new KeySelector<WC, String>() {
public String getKey(WC wc) { return wc.word; }
})
// apply ReduceFunction on grouped DataSet
.reduce(new WordCounter());
```
#### Reduce on DataSet grouped by Field Position Keys (Tuple DataSets only)
Field position keys specify one or more fields of a `Tuple` `DataSet` that are used as grouping keys.
The following code shows how to use field position keys and apply a `ReduceFunction`.
```java
DataSet<Tuple3<String, Integer, Double>> tuples = // [...]
DataSet<Tuple3<String, Integer, Double>> reducedTuples =
tuples
// group DataSet on first and second field of Tuple
.groupBy(0,1)
// apply ReduceFunction on grouped DataSet
.reduce(new MyTupleReducer());
```
### GroupReduce on grouped DataSet
A GroupReduce transformation that is applied on a grouped `DataSet` calls a user-defined `GroupReduceFunction` for each group. The difference
between this and `Reduce` is that the user defined function gets the whole group at once.
The function is invoked with an Iterable over all elements of a group and can return an arbitrary number of result elements using the collector.
#### GroupReduce on DataSet grouped by Field Position Keys (Tuple DataSets only)
The following code shows how duplicate strings can be removed from a `DataSet` grouped by Integer.
```java
public class DistinctReduce
implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
@Override
public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
Set<String> uniqStrings = new HashSet<String>();
Integer key = null;
// add all strings of the group to the set
for (Tuple2<Integer, String> t : in) {
key = t.f0;
uniqStrings.add(t.f1);
}
// emit all unique strings.
for (String s : uniqStrings) {
out.collect(new Tuple2<Integer, String>(key, s));
}
}
}
// [...]
DataSet<Tuple2<Integer, String>> input = // [...]
DataSet<Tuple2<Integer, String>> output = input
.groupBy(0) // group DataSet by the first tuple field
.reduceGroup(new DistinctReduce()); // apply GroupReduceFunction
```
#### GroupReduce on DataSet grouped by KeySelector Function
Works analogous to `KeySelector` functions in Reduce transformations.
#### GroupReduce on sorted groups (Tuple DataSets only)
A `GroupReduceFunction` accesses the elements of a group using an Iterable. Optionally, the Iterable can hand out the elements of a group in a specified order. In many cases this can help to reduce the complexity of a user-defined `GroupReduceFunction` and improve its efficiency.
Right now, this feature is only available for DataSets of Tuples.
The following code shows another example how to remove duplicate Strings in a `DataSet` grouped by an Integer and sorted by String.
```java
// GroupReduceFunction that removes consecutive identical elements
public class DistinctReduce
implements GroupReduceFunction<Tuple2<Integer, String>, Tuple2<Integer, String>> {
@Override
public void reduce(Iterable<Tuple2<Integer, String>> in, Collector<Tuple2<Integer, String>> out) {
Integer key = null;
String comp = null;
for (Tuple2<Integer, String> t : in) {
key = t.f0;
String next = t.f1;
// check if strings are different
if (com == null || !next.equals(comp)) {
out.collect(new Tuple2<Integer, String>(key, next));
comp = next;
}
}
}
}
// [...]
DataSet<Tuple2<Integer, String>> input = // [...]
DataSet<Double> output = input
.groupBy(0) // group DataSet by first field
.sortGroup(1, Order.ASCENDING) // sort groups on second tuple field
.reduceGroup(new DistinctReduce());
```
**Note:** A GroupSort often comes for free if the grouping is established using a sort-based execution strategy of an operator before the reduce operation.
#### Combinable GroupReduceFunctions
In contrast to a `ReduceFunction`, a `GroupReduceFunction` is not necessarily combinable. In order to make a `GroupReduceFunction` combinable, you need to implement (override) the `combine()` method and annotate the `GroupReduceFunction` with the `@Combinable` annotation as shown here:
```java
// Combinable GroupReduceFunction that computes two sums.
// Note that we use the RichGroupReduceFunction because it defines the combine method
@Combinable
public class MyCombinableGroupReducer
extends RichGroupReduceFunction<Tuple3<String, Integer, Double>,
Tuple3<String, Integer, Double>> {
@Override
public void reduce(Iterable<Tuple3<String, Integer, Double>> in,
Collector<Tuple3<String, Integer, Double>> out) {
String key = null
int intSum = 0;
double doubleSum = 0.0;
for (Tuple3<String, Integer, Double> curr : in) {
key = curr.f0;
intSum += curr.f1;
doubleSum += curr.f2;
}
// emit a tuple with both sums
out.collect(new Tuple3<String, Integer, Double>(key, intSum, doubleSum));
}
@Override
public void combine(Iterable<Tuple3<String, Integer, Double>> in,
Collector<Tuple3<String, Integer, Double>> out)) {
// in some cases combine() calls can simply be forwarded to reduce().
this.reduce(in, out);
}
}
```
### Aggregate on grouped Tuple DataSet
There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
- Sum,
- Min, and
- Max.
The Aggregate transformation can only be applied on a `Tuple` `DataSet` and supports only field positions keys for grouping.
The following code shows how to apply an Aggregation transformation on a `DataSet` grouped by field position keys:
```java
DataSet<Tuple3<Integer, String, Double>> input = // [...]
DataSet<Tuple3<Integer, String, Double>> output = input
.groupBy(1) // group DataSet on second field
.aggregate(SUM, 0) // compute sum of the first field
.and(MIN, 2); // compute minimum of the third field
```
To apply multiple aggregations on a DataSet it is necessary to use the `.and()` function after the first aggregate, that means `.aggregate(SUM, 0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the original DataSet.
In contrast to that `.aggregate(SUM, 0).aggregate(MIN, 2)` will apply an aggregation on an aggregation. In the given example it would produce the minimum of field 2 after calculating the sum of field 0 grouped by field 1.
**Note:** The set of aggregation functions will be extended in the future.
### Reduce on full DataSet
The Reduce transformation applies a user-defined `ReduceFunction` to all elements of a `DataSet`.
The `ReduceFunction` subsequently combines pairs of elements into one element until only a single element remains.
The following code shows how to sum all elements of an Integer `DataSet`:
```java
// ReduceFunction that sums Integers
public class IntSummer implements ReduceFunction<Integer> {
@Override
public Integer reduce(Integer num1, Integer num2) {
return num1 + num2;
}
}
// [...]
DataSet<Integer> intNumbers = // [...]
DataSet<Integer> sum = intNumbers.reduce(new IntSummer());
```
Reducing a full `DataSet` using the Reduce transformation implies that the final Reduce operation cannot be done in parallel. However, a `ReduceFunction` is automatically combinable such that a Reduce transformation does not limit scalability for most use cases.
### GroupReduce on full DataSet
The GroupReduce transformation applies a user-defined `GroupReduceFunction` on all elements of a `DataSet`.
A `GroupReduceFunction` can iterate over all elements of `DataSet` and return an arbitrary number of result elements.
The following example shows how to apply a GroupReduce transformation on a full `DataSet`:
```java
DataSet<Integer> input = // [...]
// apply a (preferably combinable) GroupReduceFunction to a DataSet
DataSet<Double> output = input.reduceGroup(new MyGroupReducer());
```
**Note:** A GroupReduce transformation on a full `DataSet` cannot be done in parallel if the `GroupReduceFunction` is not combinable. Therefore, this can be a very compute intensive operation. See the paragraph on "Combineable `GroupReduceFunction`s" above to learn how to implement a combinable `GroupReduceFunction`.
### Aggregate on full Tuple DataSet
There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
- Sum,
- Min, and
- Max.
The Aggregate transformation can only be applied on a `Tuple` `DataSet`.
The following code shows how to apply an Aggregation transformation on a full `DataSet`:
```java
DataSet<Tuple2<Integer, Double>> input = // [...]
DataSet<Tuple2<Integer, Double>> output = input
.aggregate(SUM, 0) // compute sum of the first field
.and(MIN, 1); // compute minimum of the second field
```
**Note:** Extending the set of supported aggregation functions is on our roadmap.
### Join
The Join transformation joins two `DataSet`s into one `DataSet`. The elements of both `DataSet`s are joined on one or more keys which can be specified using
- a `KeySelector` function or
- one or more field position keys (`Tuple` `DataSet` only).
There are a few different ways to perform a Join transformation which are shown in the following.
#### Default Join (Join into Tuple2)
The default Join transformation produces a new `Tuple``DataSet` with two fields. Each tuple holds a joined element of the first input `DataSet` in the first tuple field and a matching element of the second input `DataSet` in the second field.
The following code shows a default Join transformation using field position keys:
```java
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Double, Integer>> input2 = // [...]
// result dataset is typed as Tuple2
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Double, Integer>>>
result = input1.join(input2)
.where(0) // key of the first input
.equalTo(1); // key of the second input
```
#### Join with JoinFunction
A Join transformation can also call a user-defined `JoinFunction` to process joining tuples.
A `JoinFunction` receives one element of the first input `DataSet` and one element of the second input `DataSet` and returns exactly one element.
The following code performs a join of `DataSet` with custom java objects and a `Tuple` `DataSet` using `KeySelector` functions and shows how to call a user-defined `JoinFunction`:
```java
// some POJO
public class Rating {
public String name;
public String category;
public int points;
}
// Join function that joins a custom POJO with a Tuple
public class PointWeighter
implements JoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
@Override
public Tuple2<String, Double> join(Rating rating, Tuple2<String, Double> weight) {
// multiply the points and rating and construct a new output tuple
return new Tuple2<String, Double>(rating.name, rating.points * weight.f1);
}
}
DataSet<Rating> ratings = // [...]
DataSet<Tuple2<String, Double>> weights = // [...]
DataSet<Tuple2<String, Double>>
weightedRatings =
ratings.join(weights)
// key of the first input
.where(new KeySelection<Rating, String>() {
public String getKey(Rating r) { return r.category; }
})
// key of the second input
.equalTo(new KeySelection<Tuple2<String, Double>, String>() {
public String getKey(Tuple2<String, Double> t) { return t.f0; }
})
// applying the JoinFunction on joining pairs
.with(new PointWeighter());
```
#### Join with Projection
A Join transformation can construct result tuples using a projection as shown here:
```java
DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, String, Double, Byte>
result =
input1.join(input2)
// key definition on first DataSet using a field position key
.where(0)
// key definition of second DataSet using a field position key
.equalTo(0)
// select and reorder fields of matching tuples
.projectFirst(0,2).projectSecond(1).projectFirst(1)
.types(Integer.class, String.class, Double.class, Byte.class);
```
`projectFirst(int...)` and `projectSecond(int...)` select the fields of the first and second joined input that should be assembled into an output `Tuple`. The order of indexes defines the order of fields in the output tuple.
The join projection works also for non-`Tuple` `DataSet`s. In this case, `projectFirst()` or `projectSecond()` must be called without arguments to add a joined element to the output `Tuple`.
#### Join with DataSet Size Hint
In order to guide the optimizer to pick the right execution strategy, you can hint the size of a `DataSet` to join as shown here:
```java
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result1 =
// hint that the second DataSet is very small
input1.joinWithTiny(input2)
.where(0)
.equalTo(0);
DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
result2 =
// hint that the second DataSet is very large
input1.joinWithHuge(input2)
.where(0)
.equalTo(0);
```
### Cross
The Cross transformation combines two `DataSet`s into one `DataSet`. It builds all pairwise combinations of the elements of both input `DataSet`s, i.e., it builds a Cartesian product.
The Cross transformation either calls a user-defined `CrossFunction` on each pair of elements or applies a projection. Both modes are shown in the following.
**Note:** Cross is potentially a *very* compute-intensive operation which can challenge even large compute clusters!
#### Cross with User-Defined Function
A Cross transformation can call a user-defined `CrossFunction`. A `CrossFunction` receives one element of the first input and one element of the second input and returns exactly one result element.
The following code shows how to apply a Cross transformation on two `DataSet`s using a `CrossFunction`:
```java
public class Coord {
public int id;
public int x;
public int y;
}
// CrossFunction computes the Euclidean distance between two Coord objects.
public class EuclideanDistComputer
implements CrossFunction<Coord, Coord, Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> cross(Coord c1, Coord c2) {
// compute Euclidean distance of coordinates
double dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2));
return new Tuple3<Integer, Integer, Double>(c1.id, c2.id, dist);
}
}
DataSet<Coord> coords1 = // [...]
DataSet<Coord> coords2 = // [...]
DataSet<Tuple3<Integer, Integer, Double>>
distances =
coords1.cross(coords2)
// apply CrossFunction
.with(new EuclideanDistComputer());
```
#### Cross with Projection
A Cross transformation can also construct result tuples using a projection as shown here:
```java
DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
DataSet<Tuple4<Integer, Byte, Integer, Double>
result =
input1.cross(input2)
// select and reorder fields of matching tuples
.projectSecond(0).projectFirst(1,0).projectSecond(1)
.types(Integer.class, Byte.class, Integer.class, Double.class);
```
The field selection in a Cross projection works the same way as in the projection of Join results.
#### Cross with DataSet Size Hint
In order to guide the optimizer to pick the right execution strategy, you can hint the size of a `DataSet` to cross as shown here:
```java
DataSet<Tuple2<Integer, String>> input1 = // [...]
DataSet<Tuple2<Integer, String>> input2 = // [...]
DataSet<Tuple4<Integer, String, Integer, String>>
udfResult =
// hint that the second DataSet is very small
input1.crossWithTiny(input2)
// apply any Cross function (or projection)
.with(new MyCrosser());
DataSet<Tuple3<Integer, Integer, String>>
projectResult =
// hint that the second DataSet is very large
input1.crossWithHuge(input2)
// apply a projection (or any Cross function)
.projectFirst(0,1).projectSecond(1).types(Integer.class, String.class, String.class)
```
### CoGroup
The CoGroup transformation jointly processes groups of two `DataSet`s. Both `DataSet`s are grouped on a defined key and groups of both `DataSet`s that share the same key are handed together to a user-defined `CoGroupFunction`. If for a specific key only one `DataSet` has a group, the `CoGroupFunction` is called with this group and an empty group.
A `CoGroupFunction` can separately iterate over the elements of both groups and return an arbitrary number of result elements.
Similar to Reduce, GroupReduce, and Join, keys can be defined using
- a `KeySelector` function or
- one or more field position keys (`Tuple` `DataSet` only).
#### CoGroup on DataSets grouped by Field Position Keys (Tuple DataSets only)
```java
// Some CoGroupFunction definition
class MyCoGrouper
implements CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Double>, Double> {
@Override
public void coGroup(Iterable<Tuple2<String, Integer>> iVals,
Iterable<Tuple2<String, Double>> dVals,
Collector<Double> out) {
Set<Integer> ints = new HashSet<Integer>();
// add all Integer values in group to set
for (Tuple2<String, Integer>> val : iVale) {
ints.add(val.f1);
}
// multiply each Double value with each unique Integer values of group
for (Tuple2<String, Double> val : dVals) {
for (Integer i : ints) {
out.collect(val.f1 * i);
}
}
}
}
// [...]
DataSet<Tuple2<String, Integer>> iVals = // [...]
DataSet<Tuple2<String, Double>> dVals = // [...]
DataSet<Double> output = iVals.coGroup(dVals)
// group first DataSet on first tuple field
.where(0)
// group second DataSet on first tuple field
.equalTo(0)
// apply CoGroup function on each pair of groups
.with(new MyCoGrouper());
```
#### CoGroup on DataSets grouped by Key Selector Function
Works analogous to key selector functions in Join transformations.
### Union
Produces the union of two `DataSet`s, which have to be of the same type. A union of more than two `DataSet`s can be implemented with multiple union calls, as shown here:
```java
DataSet<Tuple2<String, Integer>> vals1 = // [...]
DataSet<Tuple2<String, Integer>> vals2 = // [...]
DataSet<Tuple2<String, Integer>> vals3 = // [...]
DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2)
.union(vals3);
```
[Back to top](#top)
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册