From 405d2223697344e41aa11cc66cadf6b9afcacd89 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Fri, 26 Feb 2016 21:23:56 +0100 Subject: [PATCH] [hotfix] Properly copy stream record in ReducingWindowBuffer and FoldingWindowBuffer --- .../operators/windowing/buffers/FoldingWindowBuffer.java | 2 +- .../operators/windowing/buffers/ReducingWindowBuffer.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java index fa44f9daad2..f6c2319951b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/FoldingWindowBuffer.java @@ -63,7 +63,7 @@ public class FoldingWindowBuffer implements WindowBuffer { @Override public void storeElement(StreamRecord element) throws Exception { - data.replace(foldFunction.fold(data.getValue(), element.getValue()), element.getTimestamp()); + data.replace(foldFunction.fold(data.getValue(), element.getValue())); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java index 1f2b6397647..d3bf4b4eee4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/ReducingWindowBuffer.java @@ -57,7 +57,7 @@ public class ReducingWindowBuffer implements WindowBuffer { @Override public void storeElement(StreamRecord element) throws Exception { if (data == null) { - data = new StreamRecord<>(element.getValue(), element.getTimestamp()); + data = element.copy(element.getValue()); } else { data.replace(reduceFunction.reduce(data.getValue(), element.getValue())); } -- GitLab