[FLINK-17661] Add Scala API for using new WatermarkStrategy/WatermarkGenerator

上级 e0b5d51c
......@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala
import org.apache.flink.annotation.{Internal, Public, PublicEvolving}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.eventtime.{TimestampAssigner, WatermarkGenerator, WatermarkStrategy}
import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner}
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.common.operators.ResourceSpec
......@@ -814,32 +815,40 @@ class DataStream[T](stream: JavaStream[T]) {
}
/**
* Assigns timestamps to the elements in the data stream and periodically creates
* watermarks to signal event time progress.
* Assigns timestamps to the elements in the data stream and generates watermarks to signal
* event time progress. The given [[WatermarkStrategy is used to create a [[TimestampAssigner]]
* and [[org.apache.flink.api.common.eventtime.WatermarkGenerator]].
*
* This method creates watermarks periodically (for example every second), based
* on the watermarks indicated by the given watermark generator. Even when no new elements
* in the stream arrive, the given watermark generator will be periodically checked for
* new watermarks. The interval in which watermarks are generated is defined in
* [[org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)]].
* For each event in the data stream, the [[TimestampAssigner#extractTimestamp(Object, long)]]
* method is called to assign an event timestamp.
*
* Use this method for the common cases, where some characteristic over all elements
* should generate the watermarks, or where watermarks are simply trailing behind the
* wall clock time by a certain amount.
* For each event in the data stream, the
* [[WatermarkGenerator#onEvent(Object, long, WatermarkOutput)]] will be called.
*
* For the second case and when the watermarks are required to lag behind the maximum
* timestamp seen so far in the elements of the stream by a fixed amount of time, and this
* amount is known in advance, use the
* [[BoundedOutOfOrdernessTimestampExtractor]].
* Periodically (defined by the [[ExecutionConfig#getAutoWatermarkInterval()]]), the
* [[WatermarkGenerator#onPeriodicEmit(WatermarkOutput)]] method will be called.
*
* Common watermark generation patterns can be found in the
* [[org.apache.flink.api.common.eventtime.WatermarkStrategies]] class.
*/
def assignTimestampsAndWatermarks(watermarkStrategy: WatermarkStrategy[T]): DataStream[T] = {
val cleanedStrategy = clean(watermarkStrategy)
asScalaStream(stream.assignTimestampsAndWatermarks(cleanedStrategy))
}
/**
* Assigns timestamps to the elements in the data stream and periodically creates
* watermarks to signal event time progress.
*
* For cases where watermarks should be created in an irregular fashion, for example
* based on certain markers that some element carry, use the
* [[AssignerWithPunctuatedWatermarks]].
* This method uses the deprecated watermark generator interfaces. Please switch to
* [[assignTimestampsAndWatermarks(WatermarkStrategy]] to use the
* new interfaces instead. The new interfaces support watermark idleness and no longer need
* to differentiate between "periodic" and "punctuated" watermarks.
*
* @see AssignerWithPeriodicWatermarks
* @see AssignerWithPunctuatedWatermarks
* @see #assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks)
* @deprecated please use [[assignTimestampsAndWatermarks()]]
*/
@deprecated
@PublicEvolving
def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]): DataStream[T] = {
asScalaStream(stream.assignTimestampsAndWatermarks(assigner))
......@@ -849,25 +858,14 @@ class DataStream[T](stream: JavaStream[T]) {
* Assigns timestamps to the elements in the data stream and periodically creates
* watermarks to signal event time progress.
*
* This method creates watermarks based purely on stream elements. For each element
* that is handled via [[AssignerWithPunctuatedWatermarks#extractTimestamp(Object, long)]],
* the [[AssignerWithPunctuatedWatermarks#checkAndGetNextWatermark()]] method is called,
* and a new watermark is emitted, if the returned watermark value is larger than the previous
* watermark.
* This method uses the deprecated watermark generator interfaces. Please switch to
* [[assignTimestampsAndWatermarks(WatermarkStrategy]] to use the
* new interfaces instead. The new interfaces support watermark idleness and no longer need
* to differentiate between "periodic" and "punctuated" watermarks.
*
* This method is useful when the data stream embeds watermark elements, or certain elements
* carry a marker that can be used to determine the current event time watermark.
* This operation gives the programmer full control over the watermark generation. Users
* should be aware that too aggressive watermark generation (i.e., generating hundreds of
* watermarks every second) can cost some performance.
*
* For cases where watermarks should be created in a regular fashion, for example
* every x milliseconds, use the [[AssignerWithPeriodicWatermarks]].
*
* @see AssignerWithPunctuatedWatermarks
* @see AssignerWithPeriodicWatermarks
* @see #assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks)
* @deprecated please use [[assignTimestampsAndWatermarks()]]
*/
@deprecated
@PublicEvolving
def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T])
: DataStream[T] = {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册