提交 69808fdd 编写于 作者: A Aljoscha Krettek

[doc] Unify "DataSet Transformations" page

上级 c778d289
......@@ -534,7 +534,8 @@ DataSet<Tuple3<Integer, String, Double>> output = input
<div data-lang="scala" markdown="1">
~~~scala
val input: DataSet[(Int, String, Double)] = // [...]
val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2)
~~~
</div>
......@@ -547,8 +548,8 @@ In contrast to that `.aggregate(SUM, 0).aggregate(MIN, 2)` will apply an aggrega
### Reduce on full DataSet
The Reduce transformation applies a user-defined `ReduceFunction` to all elements of a DataSet.
The `ReduceFunction` subsequently combines pairs of elements into one element until only a single element remains.
The Reduce transformation applies a user-defined reduce function to all elements of a DataSet.
The reduce function subsequently combines pairs of elements into one element until only a single element remains.
The following code shows how to sum all elements of an Integer DataSet:
......@@ -573,18 +574,19 @@ DataSet<Integer> sum = intNumbers.reduce(new IntSummer());
<div data-lang="scala" markdown="1">
~~~scala
val intNumbers = env.fromElements(1,2,3)
val sum = intNumbers.reduce (_ + _)
~~~
</div>
</div>
Reducing a full DataSet using the Reduce transformation implies that the final Reduce operation cannot be done in parallel. However, a `ReduceFunction` is automatically combinable such that a Reduce transformation does not limit scalability for most use cases.
Reducing a full DataSet using the Reduce transformation implies that the final Reduce operation cannot be done in parallel. However, a reduce function is automatically combinable such that a Reduce transformation does not limit scalability for most use cases.
### GroupReduce on full DataSet
The GroupReduce transformation applies a user-defined `GroupReduceFunction` on all elements of a DataSet.
A `GroupReduceFunction` can iterate over all elements of DataSet and return an arbitrary number of result elements.
The GroupReduce transformation applies a user-defined group-reduce function on all elements of a DataSet.
A group-reduce can iterate over all elements of DataSet and return an arbitrary number of result elements.
The following example shows how to apply a GroupReduce transformation on a full DataSet:
......@@ -601,17 +603,22 @@ DataSet<Double> output = input.reduceGroup(new MyGroupReducer());
<div data-lang="scala" markdown="1">
~~~scala
val input: DataSet[Int] = // [...]
val output = input.reduceGroup(new MyGroupReducer())
~~~
</div>
</div>
**Note:** A GroupReduce transformation on a full DataSet cannot be done in parallel if the `GroupReduceFunction` is not combinable. Therefore, this can be a very compute intensive operation. See the paragraph on "Combineable `GroupReduceFunction`s" above to learn how to implement a combinable `GroupReduceFunction`.
**Note:** A GroupReduce transformation on a full DataSet cannot be done in parallel if the
group-reduce function is not combinable. Therefore, this can be a very compute intensive operation.
See the paragraph on "Combineable Group-Reduce Functions" above to learn how to implement a
combinable group-reduce function.
### Aggregate on full Tuple DataSet
There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
There are some common aggregation operations that are frequently used. The Aggregate transformation
provides the following build-in aggregation functions:
- Sum,
- Min, and
......@@ -635,6 +642,8 @@ DataSet<Tuple2<Integer, Double>> output = input
<div data-lang="scala" markdown="1">
~~~scala
val input: DataSet[(Int, String, Double)] = // [...]
val output = input.aggregate(SUM, 0).and(MIN, 2)
~~~
......@@ -647,14 +656,15 @@ DataSet<Tuple2<Integer, Double>> output = input
The Join transformation joins two DataSets into one DataSet. The elements of both DataSets are joined on one or more keys which can be specified using
- a `KeySelector` function or
- a key-selector function or
- one or more field position keys (Tuple DataSet only).
- Case Class Fields
There are a few different ways to perform a Join transformation which are shown in the following.
#### Default Join (Join into Tuple2)
The default Join transformation produces a new TupleDataSet with two fields. Each tuple holds a joined element of the first input DataSet in the first tuple field and a matching element of the second input DataSet in the second field.
The default Join transformation produces a new Tuple DataSet with two fields. Each tuple holds a joined element of the first input DataSet in the first tuple field and a matching element of the second input DataSet in the second field.
The following code shows a default Join transformation using field position keys:
......@@ -675,18 +685,20 @@ DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Double, Integer>>>
<div data-lang="scala" markdown="1">
~~~scala
val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Double, Int)] = // [...]
val result = input1.join(input2).where(0).equalTo(1)
~~~
</div>
</div>
#### Join with JoinFunction
#### Join with Join-Function
A Join transformation can also call a user-defined `JoinFunction` to process joining tuples.
A `JoinFunction` receives one element of the first input DataSet and one element of the second input DataSet and returns exactly one element.
A Join transformation can also call a user-defined join function to process joining tuples.
A join function receives one element of the first input DataSet and one element of the second input DataSet and returns exactly one element.
The following code performs a join of DataSet with custom java objects and a Tuple DataSet using `KeySelector` functions and shows how to call a user-defined `JoinFunction`:
The following code performs a join of DataSet with custom java objects and a Tuple DataSet using key-selector functions and shows how to use a user-defined join function:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
......@@ -734,18 +746,29 @@ DataSet<Tuple2<String, Double>>
<div data-lang="scala" markdown="1">
~~~scala
case class Rating(name: String, category: String, points: Int)
val ratings: DataSet[Ratings] = // [...]
val weights: DataSet[(String, Double)] = // [...]
val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
(rating, weight) => (rating.name, rating.points * weight._2)
}
~~~
</div>
</div>
#### Join with FlatJoinFunction
#### Join with Flat-Join Function
Analogous to Map and FlatMap, a FlatJoin function behaves in the same
way as a JoinFunction, but instead of returning one element, it can
Analogous to Map and FlatMap, a FlatJoin behaves in the same
way as a Join, but instead of returning one element, it can
return (collect), zero, one, or more elements.
{% highlight java %}
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
~~~java
public class PointWeighter
implements FlatJoinFunction<Rating, Tuple2<String, Double>, Tuple2<String, Double>> {
@Override
......@@ -760,15 +783,12 @@ public class PointWeighter
DataSet<Tuple2<String, Double>>
weightedRatings =
ratings.join(weights) // [...]
{% endhighlight %}
~~~
#### Join with Projection
#### Join with Projection (Java Only)
A Join transformation can construct result tuples using a projection as shown here:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
~~~java
DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
......@@ -784,19 +804,28 @@ DataSet<Tuple4<Integer, String, Double, Byte>
.types(Integer.class, String.class, Double.class, Byte.class);
~~~
`projectFirst(int...)` and `projectSecond(int...)` select the fields of the first and second joined input that should be assembled into an output Tuple. The order of indexes defines the order of fields in the output tuple.
The join projection works also for non-Tuple DataSets. In this case, `projectFirst()` or `projectSecond()` must be called without arguments to add a joined element to the output Tuple.
</div>
<div data-lang="scala" markdown="1">
~~~scala
case class Rating(name: String, category: String, points: Int)
val ratings: DataSet[Ratings] = // [...]
val weights: DataSet[(String, Double)] = // [...]
val weightedRatings = ratings.join(weights).where("category").equalTo(0) {
(rating, weight, out: Collector[(String, Double)] =>
if (weight._2 > 0.1) out.collect(left.name, left.points * right._2)
}
~~~
</div>
</div>
`projectFirst(int...)` and `projectSecond(int...)` select the fields of the first and second joined input that should be assembled into an output Tuple. The order of indexes defines the order of fields in the output tuple.
The join projection works also for non-Tuple DataSets. In this case, `projectFirst()` or `projectSecond()` must be called without arguments to add a joined element to the output Tuple.
#### Join with DataSet Size Hint
In order to guide the optimizer to pick the right execution strategy, you can hint the size of a DataSet to join as shown here:
......@@ -827,6 +856,14 @@ DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
<div data-lang="scala" markdown="1">
~~~scala
val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Int, String)] = // [...]
// hint that the second DataSet is very small
val result1 = input1.joinWithTiny(input2).where(0).equalTo(0)
// hint that the second DataSet is very large
val result1 = input1.joinWithHuge(input2).where(0).equalTo(0)
~~~
......@@ -836,15 +873,15 @@ DataSet<Tuple2<Tuple2<Integer, String>, Tuple2<Integer, String>>>
### Cross
The Cross transformation combines two DataSets into one DataSet. It builds all pairwise combinations of the elements of both input DataSets, i.e., it builds a Cartesian product.
The Cross transformation either calls a user-defined `CrossFunction` on each pair of elements or applies a projection. Both modes are shown in the following.
The Cross transformation either calls a user-defined cross function on each pair of elements or outputs a Tuple2. Both modes are shown in the following.
**Note:** Cross is potentially a *very* compute-intensive operation which can challenge even large compute clusters!
#### Cross with User-Defined Function
A Cross transformation can call a user-defined `CrossFunction`. A `CrossFunction` receives one element of the first input and one element of the second input and returns exactly one result element.
A Cross transformation can call a user-defined cross function. A cross function receives one element of the first input and one element of the second input and returns exactly one result element.
The following code shows how to apply a Cross transformation on two DataSets using a `CrossFunction`:
The following code shows how to apply a Cross transformation on two DataSets using a cross function:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
......@@ -877,23 +914,10 @@ DataSet<Tuple3<Integer, Integer, Double>>
.with(new EuclideanDistComputer());
~~~
</div>
<div data-lang="scala" markdown="1">
~~~scala
~~~
</div>
</div>
#### Cross with Projection
A Cross transformation can also construct result tuples using a projection as shown here:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
~~~java
DataSet<Tuple3<Integer, Byte, String>> input1 = // [...]
DataSet<Tuple2<Integer, Double>> input2 = // [...]
......@@ -905,18 +929,28 @@ DataSet<Tuple4<Integer, Byte, Integer, Double>
.types(Integer.class, Byte.class, Integer.class, Double.class);
~~~
The field selection in a Cross projection works the same way as in the projection of Join results.
</div>
<div data-lang="scala" markdown="1">
~~~scala
case class Coord(id: Int, x: Int, y: Int)
val coords1: DataSet[Coord] = // [...]
val coords2: DataSet[Coord] = // [...]
val distances = coords1.cross(coords2) {
(c1, c2) =>
val dist = sqrt(pow(c1.x - c2.x, 2) + pow(c1.y - c2.y, 2))
(c1.id, c2.id, dist)
}
~~~
</div>
</div>
The field selection in a Cross projection works the same way as in the projection of Join results.
#### Cross with DataSet Size Hint
In order to guide the optimizer to pick the right execution strategy, you can hint the size of a DataSet to cross as shown here:
......@@ -947,6 +981,14 @@ DataSet<Tuple3<Integer, Integer, String>>
<div data-lang="scala" markdown="1">
~~~scala
val input1: DataSet[(Int, String)] = // [...]
val input2: DataSet[(Int, String)] = // [...]
// hint that the second DataSet is very small
val result1 = input1.crossWithTiny(input2)
// hint that the second DataSet is very large
val result1 = input1.crossWithHuge(input2)
~~~
......@@ -955,13 +997,14 @@ DataSet<Tuple3<Integer, Integer, String>>
### CoGroup
The CoGroup transformation jointly processes groups of two DataSets. Both DataSets are grouped on a defined key and groups of both DataSets that share the same key are handed together to a user-defined `CoGroupFunction`. If for a specific key only one DataSet has a group, the `CoGroupFunction` is called with this group and an empty group.
A `CoGroupFunction` can separately iterate over the elements of both groups and return an arbitrary number of result elements.
The CoGroup transformation jointly processes groups of two DataSets. Both DataSets are grouped on a defined key and groups of both DataSets that share the same key are handed together to a user-defined co-group function. If for a specific key only one DataSet has a group, the co-group function is called with this group and an empty group.
A co-group function can separately iterate over the elements of both groups and return an arbitrary number of result elements.
Similar to Reduce, GroupReduce, and Join, keys can be defined using
- a `KeySelector` function or
- one or more field position keys (Tuple DataSet only).
- a key-selector function or
- one or more field position keys (Tuple DataSet only) or
- Case Class fields.
#### CoGroup on DataSets Grouped by Field Position Keys (Tuple DataSets only)
......@@ -1010,7 +1053,19 @@ DataSet<Double> output = iVals.coGroup(dVals)
<div data-lang="scala" markdown="1">
~~~scala
val iVals: DataSet[(String, Int)] = // [...]
val dVals: DataSet[(String, Double)] = // [...]
val output = iVals.coGroup(dVals).where(0).equalTo(0) {
(iVals, dVals, out: Collector[Double]) =>
val ints = iVals map { _._2 } toSet
for (dVal <- dVals) {
for (i <- ints) {
out.collect(dVal._2 * i)
}
}
}
~~~
</div>
......@@ -1031,15 +1086,18 @@ Produces the union of two DataSets, which have to be of the same type. A union o
DataSet<Tuple2<String, Integer>> vals1 = // [...]
DataSet<Tuple2<String, Integer>> vals2 = // [...]
DataSet<Tuple2<String, Integer>> vals3 = // [...]
DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2)
.union(vals3);
DataSet<Tuple2<String, Integer>> unioned = vals1.union(vals2).union(vals3);
~~~
</div>
<div data-lang="scala" markdown="1">
~~~scala
val vals1: DataSet[(String, Int)] = // [...]
val vals2: DataSet[(String, Int)] = // [...]
val vals3: DataSet[(String, Int)] = // [...]
val unioned = vals1.union(vals2).union(vals3)
~~~
</div>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册