提交 405d2223 编写于 作者: S Stephan Ewen

[hotfix] Properly copy stream record in ReducingWindowBuffer and FoldingWindowBuffer

上级 f881e707
......@@ -63,7 +63,7 @@ public class FoldingWindowBuffer<T, ACC> implements WindowBuffer<T, ACC> {
@Override
public void storeElement(StreamRecord<T> element) throws Exception {
data.replace(foldFunction.fold(data.getValue(), element.getValue()), element.getTimestamp());
data.replace(foldFunction.fold(data.getValue(), element.getValue()));
}
@Override
......
......@@ -57,7 +57,7 @@ public class ReducingWindowBuffer<T> implements WindowBuffer<T, T> {
@Override
public void storeElement(StreamRecord<T> element) throws Exception {
if (data == null) {
data = new StreamRecord<>(element.getValue(), element.getTimestamp());
data = element.copy(element.getValue());
} else {
data.replace(reduceFunction.reduce(data.getValue(), element.getValue()));
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册