提交 f0a58f78 编写于 作者: A Aljoscha Krettek

[FLINK-4460] Add documentation for side outputs

上级 639dee3b
title: "Side Outputs"
nav-title: "Side Outputs"
nav-parent_id: streaming
nav-pos: 36
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
* This will be replaced by the TOC
In addition to the main stream that results from `DataStream` operations, you can also produce any
number of additional side output result streams. The type of data in the result streams does not
have to match the type of data in the main stream and the types of the different side outputs can
also differ. This operation can be useful when you want to split a stream of data where you would
normally have to replicate the stream and then filter out from each stream the data that you don't
want to have.
When using side outputs, you first need to define an `OutputTag` that will be used to identify a
side output stream:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
// this needs to be an anonymous inner class, so that we can analyze the type
OutputTag<String> outputTag = new OutputTag<String>("string-side-output") {};
{% endhighlight %}
<div data-lang="scala" markdown="1">
{% highlight scala %}
val outputTag = OutputTag[String]("string-side-output")
{% endhighlight %}
Notice how the `OutputTag` is typed according to the type of elements that the side output stream
Emitting data to a side output it only possible when using a
[ProcessFunction](/dev/stream/process_function.html). In the function, you can use the `Context` parameter
to emit data to a side output identified by an `OutputTag`:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
DataStream<Integer> input = ...;
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = input
.process(new ProcessFunction<Integer, Integer>() {
public void processElement(
Integer input,
Context ctx,
Collector<Integer> out) throws Exception {
// emit data to regular output
// emit data to side output
ctx.output(sideOutputTag, "sideout-" + String.valueOf(value));
{% endhighlight %}
<div data-lang="scala" markdown="1">
{% highlight scala %}
val input: DataStream[Int] = ...
val outputTag = OutputTag[String]("string-side-output")
val mainDataStream = input
.process(new ProcessFunction[Int, Int] {
override def processElement(
value: Int,
ctx: ProcessFunction[Int, Int]#Context,
out: Collector[Int]): Unit = {
// emit data to regular output
// emit data to side output
ctx.output(outputTag, "sideout-" + String.valueOf(value))
{% endhighlight %}
For retrieving the side output stream you use `getSideOutput(OutputTag)`
on the result of the `DataStream` operation. This will give you a `DataStream` that is typed
to the result of the side output stream:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = ...;
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
{% endhighlight %}
<div data-lang="scala" markdown="1">
{% highlight scala %}
val outputTag = OutputTag[String]("string-side-output")
val mainDataStream = ...
val sideOutputStream: DataStream[String] = mainDataStream.getSideOutput(outputTag)
{% endhighlight %}
......@@ -973,6 +973,51 @@ input
<span class="label label-info">Note</span> When using the `GlobalWindows` window assigner no
data is ever considered late because the end timestamp of the global window is `Long.MAX_VALUE`.
### Getting late data as a side output
Using Flink's [side output](/dev/stream/side_output.html) feature you can get a stream of the data
that was discarded as late.
You first need to specify that you want to get late data using `sideOutputLateData(OutputTag)` on
the windowed stream. Then, you can get the side-output stream on the result of the windowed
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
DataStream<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
{% endhighlight %}
<div data-lang="scala" markdown="1">
{% highlight scala %}
val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
.keyBy(<key selector>)
.window(<window assigner>)
.<windowed transformation>(<window function>)
val lateStream = result.getSideOutput(lateOutputTag)
{% endhighlight %}
### Late elements considerations
When specifying an allowed lateness greater than 0, the window along with its content is kept after the watermark passes
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册