提交 904c9510 编写于 作者: B Bowen Li 提交者: Aljoscha Krettek

[FLINK-7388] Don't set processing time as timestamp in ProcessFunction.onTimer()

上级 8c89f3c6
......@@ -236,3 +236,10 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String
</div>
{% top %}
**NOTE:** Before Flink 1.4.0, when called from a processing-time timer, the `ProcessFunction.onTimer()` method sets
the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's
harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic
depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs
that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.
\ No newline at end of file
......@@ -79,7 +79,7 @@ public class KeyedProcessOperator<K, IN, OUT>
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
collector.eraseTimestamp();
onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
......
......@@ -99,7 +99,7 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
collector.eraseTimestamp();
onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
......
......@@ -157,7 +157,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(17));
expectedOutput.add(new StreamRecord<>(1777, 5L));
expectedOutput.add(new StreamRecord<>(1777));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
......@@ -232,8 +232,8 @@ public class KeyedProcessOperatorTest extends TestLogger {
expectedOutput.add(new StreamRecord<>("INPUT:17"));
expectedOutput.add(new StreamRecord<>("INPUT:42"));
expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
expectedOutput.add(new StreamRecord<>("STATE:17"));
expectedOutput.add(new StreamRecord<>("STATE:42"));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
......@@ -272,7 +272,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
expectedOutput.add(new StreamRecord<>("PROC:1777"));
expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
expectedOutput.add(new Watermark(6));
......
......@@ -177,8 +177,8 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
expectedOutput.add(new StreamRecord<>("INPUT1:17"));
expectedOutput.add(new StreamRecord<>("INPUT2:18"));
expectedOutput.add(new StreamRecord<>("1777", 5L));
expectedOutput.add(new StreamRecord<>("1777", 6L));
expectedOutput.add(new StreamRecord<>("1777"));
expectedOutput.add(new StreamRecord<>("1777"));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
......@@ -266,8 +266,8 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
expectedOutput.add(new StreamRecord<>("INPUT1:17"));
expectedOutput.add(new StreamRecord<>("INPUT2:42"));
expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
expectedOutput.add(new StreamRecord<>("STATE:17"));
expectedOutput.add(new StreamRecord<>("STATE:42"));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
......@@ -316,7 +316,7 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
expectedOutput.add(new StreamRecord<>("PROC:1777"));
expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
expectedOutput.add(new Watermark(6));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册