提交 e34fea53 编写于 作者: S Stefan Richter 提交者: Aljoscha Krettek

[FLINK-4134] Retire Late Windows/Elements in WindowOperator

Before, when processing an element that would end up in a late window
(when using a MergingWindowAssigner), the element would be added to the
MergingWindowSet. After determining that the window is late it would not
be removed from the MergingWindowSet. This can lead to problems with
other elements being merged into these "phantom" windows and causing
triggers to be added for empty windows.

This also fixes the same code in EvictingWindowOperator.
上级 bf6df12d
......@@ -133,6 +133,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
// check if the window is already inactive
if (isLate(actualWindow)) {
LOG.info("Dropped element " + element + " for window " + actualWindow + " due to lateness.");
mergingWindows.retireWindow(actualWindow);
continue;
}
......
......@@ -335,6 +335,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
// drop if the window is already late
if (isLate(actualWindow)) {
LOG.info("Dropped element " + element+ " for window " + actualWindow + " due to lateness.");
mergingWindows.retireWindow(actualWindow);
continue;
}
......
......@@ -1477,6 +1477,8 @@ public class WindowOperatorTest {
// this is dropped as late
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000));
// this is also dropped as late (we test that they are not accidentally merged)
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10100));
testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500));
testHarness.processWatermark(new Watermark(20000));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册