diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java index a87e4362f2031b95681ae5003f3e3b8e7be5c990..96e862f975265d4f74ce50778c1500924d10ac04 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java @@ -35,11 +35,13 @@ public class EventTimeTrigger extends Trigger { @Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { - ctx.registerEventTimeTimer(window.maxTimestamp()); - - return (window.maxTimestamp() <= ctx.getCurrentWatermark()) ? - TriggerResult.FIRE_AND_PURGE : - TriggerResult.CONTINUE; + if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { + // if the watermark is already past the window fire immediately + return TriggerResult.FIRE_AND_PURGE; + } else { + ctx.registerEventTimeTimer(window.maxTimestamp()); + return TriggerResult.CONTINUE; + } } @Override