提交 7e9557af 编写于 作者: S Stephan Ewen

[docs] Document per-Kafka-partition watermarks

上级 7ccf4c4f
......@@ -324,3 +324,51 @@ class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
*Note:* It is possible to generate a watermark on every single event. However, because each watermark causes some
computation downstream, an excessive number of watermarks degrades performance.
## Timestamps per Kafka Partition
When using [Apache Kafka](connectors/kafka.html) as a data source, each Kafka partition may have a simple event time pattern (ascending
timestamps or bounded out-of-orderness). However, when consuming streams from Kafka, multiple partitions often get consumed in parallel,
interleaving the events from the partitions and destroying the per-partition patterns (this is inherent in how Kafka's consumer clients work).
In that case, you can use Flink's Kafka-partition-aware watermark generation. Using that feature, watermarks are generated inside the
Kafka consumer, per Kafka partition, and the per-partition watermarks are merged in the same way as watermarks are merged on stream shuffles.
For example, if event timestamps are strictly ascending per Kafka partition, generating per-partition watermarks with the
[ascending timestamps watermark generator](event_timestamp_extractors.html#assigners-with-ascending-timestamps) will result in perfect overall watermarks.
The illustrations below show how to use ther per-Kafka-partition watermark generation, and how watermarks propagate through the
streaming dataflow in that case.
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {
@Override
public long extractAscendingTimestamp(MyType element) {
return element.eventTimestamp();
}
});
DataStream<MyType> stream = env.addSource(kafkaSource);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})
val stream: DataStream[MyType] = env.addSource(kafkaSource)
{% endhighlight %}
</div>
</div>
<img src="{{ site.baseurl }}/fig/parallel_kafka_watermarks.svg" alt="Generating Watermarks with awareness for Kafka-partitions" class="center" width="80%" />
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册