From 912edbca1aab6f204e917032ba82dcb612b25e93 Mon Sep 17 00:00:00 2001 From: mbalassi Date: Mon, 4 May 2015 16:43:38 +0200 Subject: [PATCH] [FLINK-1956] [streaming] Proper opening of rich windowfunctions --- .../api/operators/windowing/WindowFolder.java | 10 +++++++++- .../api/operators/windowing/WindowMapper.java | 11 ++++++++++- .../api/operators/windowing/WindowReducer.java | 10 +++++++++- 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java index 138861ab4f4..04a700b9ac9 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFolder.java @@ -17,8 +17,11 @@ package org.apache.flink.streaming.api.operators.windowing; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.FoldFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.streaming.api.datastream.WindowedDataStream; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.windowing.StreamWindow; @@ -38,7 +41,7 @@ public class WindowFolder extends StreamMap, StreamWin withoutInputCopy(); } - private static class WindowFoldFunction implements + private static class WindowFoldFunction extends AbstractRichFunction implements MapFunction, StreamWindow> { private static final long serialVersionUID = 1L; @@ -65,6 +68,11 @@ public class WindowFolder extends StreamMap, StreamWin return outputWindow; } + @Override + public void setRuntimeContext(RuntimeContext t) { + FunctionUtils.setFunctionRuntimeContext(folder, t); + } + } } diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java index f47bcb70275..400ba243ef2 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMapper.java @@ -17,7 +17,10 @@ package org.apache.flink.streaming.api.operators.windowing; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.streaming.api.datastream.WindowedDataStream; import org.apache.flink.streaming.api.functions.WindowMapFunction; import org.apache.flink.streaming.api.operators.StreamMap; @@ -39,7 +42,8 @@ public class WindowMapper extends StreamMap, StreamWin withoutInputCopy(); } - private static class WindowMap implements MapFunction, StreamWindow> { + private static class WindowMap extends AbstractRichFunction + implements MapFunction, StreamWindow> { private static final long serialVersionUID = 1L; WindowMapFunction mapper; @@ -59,6 +63,11 @@ public class WindowMapper extends StreamMap, StreamWin return outputWindow; } + @Override + public void setRuntimeContext(RuntimeContext t) { + FunctionUtils.setFunctionRuntimeContext(mapper, t); + } + } } \ No newline at end of file diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java index 3dc4038b35d..4143064f331 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowReducer.java @@ -17,8 +17,11 @@ package org.apache.flink.streaming.api.operators.windowing; +import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.streaming.api.datastream.WindowedDataStream; import org.apache.flink.streaming.api.operators.StreamMap; import org.apache.flink.streaming.api.windowing.StreamWindow; @@ -39,7 +42,7 @@ public class WindowReducer extends StreamMap, StreamWindow< withoutInputCopy(); } - private static class WindowReduceFunction implements + private static class WindowReduceFunction extends AbstractRichFunction implements MapFunction, StreamWindow> { private static final long serialVersionUID = 1L; @@ -64,6 +67,11 @@ public class WindowReducer extends StreamMap, StreamWindow< return outputWindow; } + @Override + public void setRuntimeContext(RuntimeContext t) { + FunctionUtils.setFunctionRuntimeContext(reducer, t); + } + } } -- GitLab