From 8791d4f020f812593ceca1a047e8b0722c19d879 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Sun, 26 Apr 2015 13:02:59 +0200 Subject: [PATCH] [streaming] Timestamp comparison bugfix --- .../streaming/api/datastream/WindowedDataStream.java | 3 ++- .../streaming/api/windowing/helper/TimestampWrapper.java | 9 +++++++-- .../api/windowing/policy/TimeTriggerPolicyTest.java | 3 +++ .../flink/streaming/examples/windowing/StockPrices.java | 5 +++-- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index da8611e6fd5..66dd4f3aeba 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -450,7 +450,8 @@ public class WindowedDataStream { // discretized stream, we also pass the type of the windowbuffer DiscretizedStream discretized = discretize(transformation, windowBuffer); - if (!(windowBuffer instanceof PreAggregator)) { + if (getEviction() instanceof KeepAllEvictionPolicy + && !(windowBuffer instanceof PreAggregator)) { throw new RuntimeException( "Error in preaggregator logic, parallel time reduce should always be preaggregated"); } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java index a14b1c1ecd0..c2ec7c2e6b7 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java @@ -50,8 +50,13 @@ public class TimestampWrapper implements Serializable { try { @SuppressWarnings("unchecked") TimestampWrapper otherTSW = (TimestampWrapper) other; - return startTime == otherTSW.startTime - && timestamp.getClass() == otherTSW.timestamp.getClass(); + if (timestamp instanceof SystemTimestamp + && otherTSW.timestamp instanceof SystemTimestamp) { + return true; + } else { + return startTime == otherTSW.startTime + && timestamp.getClass() == otherTSW.timestamp.getClass(); + } } catch (ClassCastException e) { return false; } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java index 2fabbae4acc..5b268540d9d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp; import org.apache.flink.streaming.api.windowing.helper.Timestamp; import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper; import org.junit.Test; @@ -111,6 +112,8 @@ public class TimeTriggerPolicyTest { assertNotEquals(new TimeTriggerPolicy(5, new TimestampWrapper(timeStamp, 0)), new TimeTriggerPolicy(5, new TimestampWrapper(timeStamp, 3))); + + assertEquals(SystemTimestamp.getWrapper(), SystemTimestamp.getWrapper()); } @Test diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java index ab29d3c3f60..d745fc54689 100644 --- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java +++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/StockPrices.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.examples.windowing; +import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Random; @@ -203,7 +204,7 @@ public class StockPrices { // DATA TYPES // ************************************************************************* - public static class StockPrice { + public static class StockPrice implements Serializable { private static final long serialVersionUID = 1L; public String symbol; @@ -226,7 +227,7 @@ public class StockPrices { } } - public static class Count { + public static class Count implements Serializable{ private static final long serialVersionUID = 1L; public String symbol; -- GitLab