diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index ff519753b6b3ea510b32c6f529bc2de8369f7a16..5b5c34b14a8634ea28970a980231854dda8c0430 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -21,6 +21,10 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; @@ -84,6 +88,7 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator; import org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator; +import org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; @@ -870,6 +875,42 @@ public class DataStream { // Timestamps and watermarks // ------------------------------------------------------------------------ + /** + * Assigns timestamps to the elements in the data stream and generates watermarks to + * signal event time progress. + * + *

For each event in the data stream, the {@link TimestampAssigner#extractTimestamp(Object, long)} + * method is called to assign an event timestamp. + * + *

For each event in the data stream, the {@link WatermarkGenerator#onEvent(Object, long, WatermarkOutput)} + * will be called. + * + *

Periodically (defined by the {@link ExecutionConfig#getAutoWatermarkInterval()}), the + * {@link WatermarkGenerator#onPeriodicEmit(WatermarkOutput)} method will be called. + * + *

Common watermark generation patterns can be found in the + * {@link org.apache.flink.api.common.eventtime.WatermarkStrategies} class. + * + * @param timestampAssigner The function to assign timestamps to events. + * @param watermarkStrategy The strategy to generate watermarks based on event timestamps. + * @return The stream after the transformation, with assigned timestamps and watermarks. + */ + public SingleOutputStreamOperator assignTimestampsAndWatermarks( + TimestampAssigner timestampAssigner, + WatermarkStrategy watermarkStrategy) { + + final TimestampAssigner cleanedAssigner = clean(timestampAssigner); + + final TimestampsAndWatermarksOperator operator = + new TimestampsAndWatermarksOperator<>(cleanedAssigner, watermarkStrategy); + + // match parallelism to input, to have a 1:1 source -> timestamps/watermarks relationship and chain + final int inputParallelism = getTransformation().getParallelism(); + + return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator) + .setParallelism(inputParallelism); + } + /** * Assigns timestamps to the elements in the data stream and periodically creates * watermarks to signal event time progress. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java new file mode 100644 index 0000000000000000000000000000000000000000..c70f026eb6a28f3dc19d529e1ee52acb5a1a527f --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndWatermarksOperator.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.operators; + +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.operators.AbstractStreamOperator; +import org.apache.flink.streaming.api.operators.ChainingStrategy; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatus; +import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A stream operator that may do one or both of the following: extract timestamps from + * events and generate watermarks. + * + *

These two responsibilities run in the same operator rather than in two different ones, + * because the implementation of the timestamp assigner and the watermark generator is + * frequently in the same class (and should be run in the same instance), even though the + * separate interfaces support the use of different classes. + * + * @param The type of the input elements + */ +public class TimestampsAndWatermarksOperator + extends AbstractStreamOperator + implements OneInputStreamOperator, ProcessingTimeCallback { + + private static final long serialVersionUID = 1L; + + private final WatermarkStrategy watermarkStrategy; + + /** The timestamp assigner. */ + private final TimestampAssigner timestampAssigner; + + /** The watermark generator, initialized during runtime. */ + private transient WatermarkGenerator watermarkGenerator; + + /** The watermark output gateway, initialized during runtime. */ + private transient WatermarkOutput wmOutput; + + /** The interval (in milliseconds) for periodic watermark probes. Initialized during runtime. */ + private transient long watermarkInterval; + + public TimestampsAndWatermarksOperator( + TimestampAssigner timestampAssigner, + WatermarkStrategy watermarkStrategy) { + + this.timestampAssigner = checkNotNull(timestampAssigner); + this.watermarkStrategy = checkNotNull(watermarkStrategy); + this.chainingStrategy = ChainingStrategy.ALWAYS; + } + + @Override + public void open() throws Exception { + super.open(); + + watermarkGenerator = watermarkStrategy.createWatermarkGenerator(); + + wmOutput = new WatermarkEmitter(output, getContainingTask().getStreamStatusMaintainer()); + + watermarkInterval = getExecutionConfig().getAutoWatermarkInterval(); + if (watermarkInterval > 0) { + final long now = getProcessingTimeService().getCurrentProcessingTime(); + getProcessingTimeService().registerTimer(now + watermarkInterval, this); + } + } + + @Override + public void processElement(final StreamRecord element) throws Exception { + final T event = element.getValue(); + final long previousTimestamp = element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE; + final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp); + + element.setTimestamp(newTimestamp); + output.collect(element); + watermarkGenerator.onEvent(event, newTimestamp, wmOutput); + } + + @Override + public void onProcessingTime(long timestamp) throws Exception { + watermarkGenerator.onPeriodicEmit(wmOutput); + + final long now = getProcessingTimeService().getCurrentProcessingTime(); + getProcessingTimeService().registerTimer(now + watermarkInterval, this); + } + + /** + * Override the base implementation to completely ignore watermarks propagated from + * upstream, except for the "end of time" watermark. + */ + @Override + public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) throws Exception { + // if we receive a Long.MAX_VALUE watermark we forward it since it is used + // to signal the end of input and to not block watermark progress downstream + if (mark.getTimestamp() == Long.MAX_VALUE) { + wmOutput.emitWatermark(Watermark.MAX_WATERMARK); + } + } + + @Override + public void close() throws Exception { + super.close(); + watermarkGenerator.onPeriodicEmit(wmOutput); + } + + // ------------------------------------------------------------------------ + + /** + * Implementation of the {@code WatermarkEmitter}, based on the components + * that are available inside a stream operator. + */ + private static final class WatermarkEmitter implements WatermarkOutput { + + private final Output output; + + private final StreamStatusMaintainer statusMaintainer; + + private long currentWatermark; + + private boolean idle; + + WatermarkEmitter(Output output, StreamStatusMaintainer statusMaintainer) { + this.output = output; + this.statusMaintainer = statusMaintainer; + this.currentWatermark = Long.MIN_VALUE; + } + + @Override + public void emitWatermark(Watermark watermark) { + final long ts = watermark.getTimestamp(); + + if (ts <= currentWatermark) { + return; + } + + currentWatermark = ts; + + if (idle) { + idle = false; + statusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE); + } + + output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(ts)); + } + + @Override + public void markIdle() { + idle = true; + statusMaintainer.toggleStreamStatus(StreamStatus.IDLE); + } + } +}