diff --git a/docs/apis/streaming/connectors/kafka.md b/docs/apis/streaming/connectors/kafka.md index 45ec6adb501c76432a368fe291f622ca4b1ca411..9bd70be603a1067d6a7cfce95b10fe8de1464481 100644 --- a/docs/apis/streaming/connectors/kafka.md +++ b/docs/apis/streaming/connectors/kafka.md @@ -198,6 +198,60 @@ Flink on YARN supports automatic restart of lost YARN containers. If checkpointing is not enabled, the Kafka consumer will periodically commit the offsets to Zookeeper. +#### Kafka Consumers and Timestamp Extraction/Watermark Emission + +In many scenarios, the timestamp of a record is embedded (explicitly or implicitly) in the record itself. +In addition, the user may want to emit watermarks either periodically, or in an irregular fashion, e.g. based on +special records in the Kafka stream that contain the current event-time watermark. For these cases, the Flink Kafka +Consumer allows the specification of an `AssignerWithPeriodicWatermarks` or an `AssignerWithPunctuatedWatermarks`. + +You can specify your custom timestamp extractor/watermark emitter as described +[here]({{ site.baseurl }}/apis/streaming/event_timestamps_watermarks.html), or use one from the +[predefined ones]({{ site.baseurl }}/apis/streaming/event_timestamp_extractors.html). After doing so, you +can pass it to your consumer in the following way: + +
+
+{% highlight java %} +Properties properties = new Properties(); +properties.setProperty("bootstrap.servers", "localhost:9092"); +// only required for Kafka 0.8 +properties.setProperty("zookeeper.connect", "localhost:2181"); +properties.setProperty("group.id", "test"); + +FlinkKafkaConsumer08 myConsumer = + new FlinkKafkaConsumer08<>("topic", new SimpleStringSchema(), properties); +myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); + +DataStream stream = env + .addSource(myConsumer) + .print(); +{% endhighlight %} +
+
+{% highlight scala %} +val properties = new Properties(); +properties.setProperty("bootstrap.servers", "localhost:9092"); +// only required for Kafka 0.8 +properties.setProperty("zookeeper.connect", "localhost:2181"); +properties.setProperty("group.id", "test"); + +val myConsumer = new FlinkKafkaConsumer08[Stirng]("topic", new SimpleStringSchema(), properties); +myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter()); +stream = env + .addSource(myConsumer) + .print +{% endhighlight %} +
+
+ +Internally, an instance of the assigner is executed per Kafka partition. +When such an assigner is specified, for each record read from Kafka, the +`extractTimestamp(T element, long previousElementTimestamp)` is called to assign a timestamp to the record and +the `Watermark getCurrentWatermark()` (for periodic) or the +`Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` (for punctuated) is called to determine +if a new watermark should be emitted and with which timestamp. + ### Kafka Producer The `FlinkKafkaProducer08` writes data to a Kafka topic. The producer can specify a custom partitioner that assigns diff --git a/docs/apis/streaming/event_timestamps_watermarks.md b/docs/apis/streaming/event_timestamps_watermarks.md index fad84a6d81a3a108ff02d2918ad7bc2ca9fc8a93..493e11a229b4a4bb38817686f99f8ac78c62dc47 100644 --- a/docs/apis/streaming/event_timestamps_watermarks.md +++ b/docs/apis/streaming/event_timestamps_watermarks.md @@ -116,10 +116,14 @@ those timestamps will be overwritten by the TimestampAssigner. Similarly, Waterm Timestamp Assigners take a stream and produce a new stream with timestamped elements and watermarks. If the original stream had timestamps and/or watermarks already, the timestamp assigner overwrites them. -The timestamp assigners usually are specified immediately after the data source, but it is not strictly required to do so. A -common pattern is, for example, to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner. +The timestamp assigners usually are specified immediately after the data source but it is not strictly required to do so. +A common pattern is, for example, to parse (*MapFunction*) and filter (*FilterFunction*) before the timestamp assigner. In any case, the timestamp assigner needs to be specified before the first operation on event time -(such as the first window operation). +(such as the first window operation). As a special case, when using Kafka as the source of a streaming job, +Flink allows the specification of a timestamp assigner / watermark emitter inside +the source (or consumer) itself. More information on how to do so can be found in the +[Kafka Connector documentation]({{ site.baseurl }}/apis/streaming/connectors/kafka.html). + **NOTE:** The remainder of this section presents the main interfaces a programmer has to implement in order to create her own timestamp extractors/watermark emitters. @@ -132,7 +136,9 @@ To see the pre-implemented extractors that ship with Flink, please refer to the final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); -DataStream stream = env.addSource(new FlinkKafkaConsumer09(topic, schema, props)); +DataStream stream = env.readFile( + myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, + FilePathFilter.createDefaultFilter(), typeInfo); DataStream withTimestampsAndWatermarks = stream .filter( event -> event.severity() == WARNING ) @@ -150,7 +156,9 @@ withTimestampsAndWatermarks val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) -val stream: DataStream[MyEvent] = env.addSource(new FlinkKafkaConsumer09[MyEvent](topic, schema, props)) +val stream: DataStream[MyEvent] = env.readFile( + myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100, + FilePathFilter.createDefaultFilter()); val withTimestampsAndWatermarks: DataStream[MyEvent] = stream .filter( _.severity == WARNING ) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 859b96249c6ce0e78549eca78979366c29755d7a..204557db0100ab5cfaaeba394a1d43922d6dee10 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -776,8 +776,8 @@ public class DataStream { } /** - * Assigns timestamps to the elements in the data stream and periodically creates - * watermarks to signal event time progress. + * Assigns timestamps to the elements in the data stream and creates watermarks to + * signal event time progress based on the elements themselves. * *

This method creates watermarks based purely on stream elements. For each element * that is handled via {@link AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)},