提交 912edbca 编写于 作者: M mbalassi

[FLINK-1956] [streaming] Proper opening of rich windowfunctions

上级 ae867efd
......@@ -17,8 +17,11 @@
package org.apache.flink.streaming.api.operators.windowing;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.windowing.StreamWindow;
......@@ -38,7 +41,7 @@ public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
withoutInputCopy();
}
private static class WindowFoldFunction<IN, OUT> implements
private static class WindowFoldFunction<IN, OUT> extends AbstractRichFunction implements
MapFunction<StreamWindow<IN>, StreamWindow<OUT>> {
private static final long serialVersionUID = 1L;
......@@ -65,6 +68,11 @@ public class WindowFolder<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
return outputWindow;
}
@Override
public void setRuntimeContext(RuntimeContext t) {
FunctionUtils.setFunctionRuntimeContext(folder, t);
}
}
}
......@@ -17,7 +17,10 @@
package org.apache.flink.streaming.api.operators.windowing;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.operators.StreamMap;
......@@ -39,7 +42,8 @@ public class WindowMapper<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
withoutInputCopy();
}
private static class WindowMap<T, R> implements MapFunction<StreamWindow<T>, StreamWindow<R>> {
private static class WindowMap<T, R> extends AbstractRichFunction
implements MapFunction<StreamWindow<T>, StreamWindow<R>> {
private static final long serialVersionUID = 1L;
WindowMapFunction<T, R> mapper;
......@@ -59,6 +63,11 @@ public class WindowMapper<IN, OUT> extends StreamMap<StreamWindow<IN>, StreamWin
return outputWindow;
}
@Override
public void setRuntimeContext(RuntimeContext t) {
FunctionUtils.setFunctionRuntimeContext(mapper, t);
}
}
}
\ No newline at end of file
......@@ -17,8 +17,11 @@
package org.apache.flink.streaming.api.operators.windowing;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.windowing.StreamWindow;
......@@ -39,7 +42,7 @@ public class WindowReducer<IN> extends StreamMap<StreamWindow<IN>, StreamWindow<
withoutInputCopy();
}
private static class WindowReduceFunction<T> implements
private static class WindowReduceFunction<T> extends AbstractRichFunction implements
MapFunction<StreamWindow<T>, StreamWindow<T>> {
private static final long serialVersionUID = 1L;
......@@ -64,6 +67,11 @@ public class WindowReducer<IN> extends StreamMap<StreamWindow<IN>, StreamWindow<
return outputWindow;
}
@Override
public void setRuntimeContext(RuntimeContext t) {
FunctionUtils.setFunctionRuntimeContext(reducer, t);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册