提交 8791d4f0 编写于 作者: G Gyula Fora

[streaming] Timestamp comparison bugfix

上级 b950d5b4
......@@ -450,7 +450,8 @@ public class WindowedDataStream<OUT> {
// discretized stream, we also pass the type of the windowbuffer
DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer);
if (!(windowBuffer instanceof PreAggregator)) {
if (getEviction() instanceof KeepAllEvictionPolicy
&& !(windowBuffer instanceof PreAggregator)) {
throw new RuntimeException(
"Error in preaggregator logic, parallel time reduce should always be preaggregated");
}
......
......@@ -50,8 +50,13 @@ public class TimestampWrapper<T> implements Serializable {
try {
@SuppressWarnings("unchecked")
TimestampWrapper<T> otherTSW = (TimestampWrapper<T>) other;
if (timestamp instanceof SystemTimestamp
&& otherTSW.timestamp instanceof SystemTimestamp) {
return true;
} else {
return startTime == otherTSW.startTime
&& timestamp.getClass() == otherTSW.timestamp.getClass();
}
} catch (ClassCastException e) {
return false;
}
......
......@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
import org.junit.Test;
......@@ -111,6 +112,8 @@ public class TimeTriggerPolicyTest {
assertNotEquals(new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp,
0)), new TimeTriggerPolicy<Integer>(5, new TimestampWrapper<Integer>(timeStamp, 3)));
assertEquals(SystemTimestamp.getWrapper(), SystemTimestamp.getWrapper());
}
@Test
......
......@@ -17,6 +17,7 @@
package org.apache.flink.streaming.examples.windowing;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Random;
......@@ -203,7 +204,7 @@ public class StockPrices {
// DATA TYPES
// *************************************************************************
public static class StockPrice {
public static class StockPrice implements Serializable {
private static final long serialVersionUID = 1L;
public String symbol;
......@@ -226,7 +227,7 @@ public class StockPrices {
}
}
public static class Count {
public static class Count implements Serializable{
private static final long serialVersionUID = 1L;
public String symbol;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册