提交 7ec6d7b5 编写于 作者: K kl0u 提交者: Robert Metzger

[FLINK-3752] Add Per-Kafka-Partition Watermark Generation to the docs

This closes #2142
上级 a973d84b
......@@ -198,6 +198,60 @@ Flink on YARN supports automatic restart of lost YARN containers.
If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper.
#### Kafka Consumers and Timestamp Extraction/Watermark Emission
In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself.
In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, e.g. based on
special records in the Kafka stream that contain the current event-time watermark. For these cases, the Flink Kafka
Consumer allows the specification of an `AssignerWithPeriodicWatermarks` or an `AssignerWithPunctuatedWatermarks`.
You can specify your custom timestamp extractor/watermark emitter as described
[here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), or use one from the
[predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, you
can pass it to your consumer in the following way:
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
FlinkKafkaConsumer08<String> myConsumer =
new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
DataStream<String> stream = env
.addSource(myConsumer)
.print();
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
val myConsumer = new FlinkKafkaConsumer08[Stirng]("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
stream = env
.addSource(myConsumer)
.print
{% endhighlight %}
</div>
</div>
Internally, an instance of the assigner is executed per Kafka partition.
When such an assigner is specified, for each record read from Kafka, the
`extractTimestamp(T element, long previousElementTimestamp)` is called to assign a timestamp to the record and
the `Watermark getCurrentWatermark()` (for periodic) or the
`Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` (for punctuated) is called to determine
if a new watermark should be emitted and with which timestamp.
### Kafka Producer
The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns
......
......@@ -116,10 +116,14 @@ those timestamps will be overwritten by the TimestampAssigner. Similarly, Waterm
Timestamp Assigners take a stream and produce a new stream with timestamped elements and watermarks. If the
original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them.
The timestamp assigners usually are specified immediately after the data source, but it is not strictly required to do so. A
common pattern is, for example, to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner.
The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so.
A common pattern is, for example, to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner.
In any case, the timestamp assigner needs to be specified before the first operation on event time
(such as the first window operation).
(such as the first window operation). As a special case, when using Kafka as the source of a streaming job,
Flink allows the specification of a timestamp assigner / watermark emitter inside
the source (or consumer) itself. More information on how to do so can be found in the
[Kafka Connector documentation]({{ site.baseurl }}/apis/streaming/connectors/kafka.html).
**NOTE:** The remainder of this section presents the main interfaces a programmer has
to implement in order to create her own timestamp extractors/watermark emitters.
......@@ -132,7 +136,9 @@ To see the pre-implemented extractors that ship with Flink, please refer to the
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
......@@ -150,7 +156,9 @@ withTimestampsAndWatermarks
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props))
val stream: DataStream[MyEvent] = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter());
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
.filter( _.severity == WARNING )
......
......@@ -776,8 +776,8 @@ public class DataStream<T> {
}
/**
* Assigns timestamps to the elements in the data stream and periodically creates
* watermarks to signal event time progress.
* Assigns timestamps to the elements in the data stream and creates watermarks to
* signal event time progress based on the elements themselves.
*
* <p>This method creates watermarks based purely on stream elements. For each element
* that is handled via {@link AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)},
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册