提交 a868da83 编写于 作者: A Aljoscha Krettek

[doc] Update programming guide for scala expression keys

上级 da21ea3e
......@@ -872,6 +872,7 @@ values. Keys are "virtual": they are defined as functions over the
actual data to guide the grouping operator.
### Define keys for Tuples
{:.no_toc}
The simplest case is grouping a data set of Tuples on one or more
fields of the Tuple:
......@@ -904,6 +905,8 @@ DataSet<Tuple3<Tuple2<Integer, Float>,String,Long>> ds;
Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the Integer and Float being the key). If you want to "navigate" into the nested `Tuple2`, you have to use a string-based expression, as explained below. For this particular example, you would have to specfiy `f0.f0`.
### Define key using a String Expression
{:.no_toc}
Starting from release 0.7-incubating, you can use String-based key expressions to select keys.
The String expressions allow to specify the name of the field in a class you want to group by.
......@@ -964,6 +967,7 @@ These are valid expressions for the example POJO above:
Please note that you can only use types inside POJOs that Flink is able to serialize. Currently, we are using [Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`).
### Define key using a Key Selector Function
{:.no_toc}
An additional way to define keys are "key selector" functions, which
takes as argument one dataset element and returns a key of an
......@@ -1027,64 +1031,133 @@ you do not need to physically pack the data set types into keys and
values. Keys are "virtual": they are defined as functions over the
actual data to guide the grouping operator.
The simplest case is grouping a data set of Case Classes on one or more
of it's fields:
### Define keys for Tuples
{:.no_toc}
The simplest case is grouping a data set of Tuples on one or more
fields of the Tuple:
{% highlight scala %}
case class WordCount(docId: Int, word: String, count: Int)
val input: DataSet[WordCount] = // [...]
val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input
.groupBy("word")
.groupBy(0)
.reduceGroup(/*do something*/)
{% endhighlight %}
The data set is grouped on the second field of the Case Class (the one of
String type). The group reduce function will thus receive groups of elements with
the same value in the second field.
The data set is grouped on the first field of the tuples (the one of
Integer type). The group-reduce function will thus receive groups of tuples with
the same value in the first field.
{% highlight scala %}
val input: DataSet[WordCount] = // [...]
val input: DataSet[(Int, String, Long)] = // [...]
val grouped = input
.groupBy("docId", "word")
.reduceGroup(/*do something*/);
.groupBy(0,1)
.reduce(/*do something*/)
{% endhighlight %}
Here the DataSet is grouped on the composite key consisting of the first and the
second fields, therefore the group reduce function will receive groups
with the same value in both fields.
The data set is grouped on the composite key consisting of the first and the
second fields, therefore the group-reduce function will receive groups
with the same value for both fields.
As a special case, fields of Tuple DataSets can also be specified by (zero-based) index:
A note on nested Tuples: If you have a DataSet with a nested tuple, such as:
{% highlight scala %}
val input: DataSet[(Int, String, Int)] = // [...]
val grouped = input
.groupBy(0, 1)
.reduceGroup(/*do something*/);
val ds: DataSet[((Int, Float), String, Long)]
{% endhighlight %}
Specifying `groupBy(0)` will cause the system to use the full `Tuple2` as a key (with the Int and
Float being the key). If you want to "navigate" into the nested `Tuple2`, you have to use a
string-based expression, as explained below. For this particular example, you would have to specfiy
`"_1._1"`.
### Define key using a String Expression
{:.no_toc}
Starting from release 0.7-incubating, you can use String-based key expressions to select keys.
The String expressions allow to specify the name of the field in a class you want to group by.
In the example below, we have a `WC` POJO with two fields "word" and "count". To group by the field
"word", we just pass this name to the `groupBy()` function.
{% highlight java %}
// some ordinary POJO (Plain old Java Object)
class WC(var word: String, var count: Int) {
def this() { this("", 0L) }
}
val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy("word").reduce(/*do something*/)
// or, as a case class, which is less typing
case class WC(word: String, count: Int)
val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy("word").reduce(/*do something*/)
{% endhighlight %}
For DataSets that don't contain Case Classes or Tuples, key definition is done via a "key selector"
function, which takes as argument one dataset element and must return a key of an
arbitrary data type. For example:
**Conditions** for a class to enable using field selection expressions:
- The class must be public
- It must have a public constructor without arguments or be a case class.
- All fields either have to be public or there must be getters and setters for all non-public
fields. If the field name is `foo` the getter and setters must be called `foo` and `foo_=`. This
is what normally gets generated when you hava a `var foo` in your class. This also automatically
applies to case classes since the getters and setters are automatically generated.
**Valid Expressions**:
- You can select POJO fields by their field name
- You can select Tuple fields by their field name as well. For example `_1` or `_6`.
- You can select nested fields in POJOs and Tuples. Expressions like `user.zip` or `user.groupId`
are valid. Flink also supports POJOs inside Tuples: `_2.user.zip`.
- You can select all fields at each level. To select all fields, specify `*`. This also works for
the nested case: `user.*`.
**Example for nested POJO**
{% highlight scala %}
// some ordinary object
class WC {
val word: String
val count: Int
class WC(var complex: ComplexNestedClass, var count: Int) {
def this() { this(null, 0) }
}
class ComplexNestedClass(
var someNumber: Int,
someFloat: Float,
word: (Long, Long, String),
hadoopCitizen: IntWritable) {
def this() { this(0, 0, (0, 0, ""), new IntWritable(0)) }
}
{% endhighlight %}
These are valid expressions for the example POJO above:
- `count`: The count field in the `WC` class.
- `complex.*`: Selects all fields in the `ComplexNestedClass`.
- `complex.word._3`: Selects the last field in the Tuple3.
- `complex.hadoopCitizen`: Selects a Hadoop-`Writable` type as a key.
Please note that you can only use types inside POJOs that Flink is able to serialize. Currently,
we are using [Avro](http://avro.apache.org) to serialize arbitrary objects (such as `Date`).
### Define key using a Key Selector Function
{:.no_toc}
An additional way to define keys are "key selector" functions, which
takes as argument one dataset element and returns a key of an
arbitrary data type by performing an arbitrary computation on this
element. For example:
{% highlight scala %}
// some ordinary case class
case class WC(word: String, count: Int)
val words: DataSet[WC] = // [...]
val counts: DataSet[WC] = words groupBy { _.word } reduce { /*do something*/}
val wordCounts = words
.groupBy( _.word ).reduce(/*do something*/)
{% endhighlight %}
Remember that keys are not only used for grouping, but also joining and matching data sets:
{% highlight scala %}
// some object
// some case class
case class Rating(name: String, category: String, points: Int)
val ratings: DataSet[Rating] = // [...]
val weights: DataSet[(String, Double)] = // [...]
// Tuples are also Case Classes in Scala, so we could use this:
val weightedRatings = ratings.join(weights).where("category").equalTo("_1")
// Or This:
val weightedRatings2 = ratings.join(weights).where("category").equalTo(0)
{% endhighlight %}
</div>
</div>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册