提交 7bdac1af 编写于 作者: S Stephan Ewen

[FLINK-3578] [streaming scala] Support RichFunctions for Scala WindowFunction and AllWindowFunction

This also consolidates the various wrapper classes for fold() and reduce() functions.

This closes #1765
上级 bb62ab08
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
...@@ -7,7 +7,7 @@ ...@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance * "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at * with the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
...@@ -15,12 +15,25 @@ ...@@ -15,12 +15,25 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.flink.streaming.api.functions.windowing; package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.Public; import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.api.windowing.windows.Window;
/**
* Rich variant of the {@link WindowFunction}. As a {@link RichFunction}, it gives access to the
* {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and tear-down methods:
* {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
* {@link RichFunction#close()}.
*
* @param <IN> The type of the input value.
* @param <OUT> The type of the output value.
* @param <KEY> The type of the key.
* @param <W> The type of {@code Window} that this window function can be applied on.
*/
@Public @Public
public abstract class RichWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction implements WindowFunction<IN, OUT, KEY, W> { public abstract class RichWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction implements WindowFunction<IN, OUT, KEY, W> {
......
...@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector; ...@@ -33,7 +33,7 @@ import org.apache.flink.util.Collector;
public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window> extends InternalWindowFunction<Iterable<IN>, OUT, KEY, W> implements RichFunction { public final class InternalIterableWindowFunction<IN, OUT, KEY, W extends Window> extends InternalWindowFunction<Iterable<IN>, OUT, KEY, W> implements RichFunction {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
protected WindowFunction<IN, OUT, KEY, W> wrappedFunction; protected final WindowFunction<IN, OUT, KEY, W> wrappedFunction;
public InternalIterableWindowFunction(WindowFunction<IN, OUT, KEY, W> wrappedFunction) { public InternalIterableWindowFunction(WindowFunction<IN, OUT, KEY, W> wrappedFunction) {
this.wrappedFunction = wrappedFunction; this.wrappedFunction = wrappedFunction;
......
...@@ -24,15 +24,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation ...@@ -24,15 +24,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream} import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction => JAllWindowFunction}
import org.apache.flink.streaming.api.scala.function.AllWindowFunction import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.scala.function.util.{ScalaAllWindowFunction, ScalaAllWindowFunctionWrapper, ScalaReduceFunction, ScalaFoldFunction}
import org.apache.flink.streaming.api.windowing.evictors.Evictor import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.triggers.Trigger import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
/** /**
* A [[AllWindowedStream]] represents a data stream where the stream of * A [[AllWindowedStream]] represents a data stream where the stream of
* elements is split into windows based on a * elements is split into windows based on a
...@@ -56,7 +54,7 @@ import scala.collection.JavaConverters._ ...@@ -56,7 +54,7 @@ import scala.collection.JavaConverters._
*/ */
@Public @Public
class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
/** /**
* Sets the [[Trigger]] that should be used to trigger window emission. * Sets the [[Trigger]] that should be used to trigger window emission.
*/ */
...@@ -121,9 +119,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { ...@@ -121,9 +119,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
throw new NullPointerException("Reduce function must not be null.") throw new NullPointerException("Reduce function must not be null.")
} }
val cleanFun = clean(function) val cleanFun = clean(function)
val reducer = new ReduceFunction[T] { val reducer = new ScalaReduceFunction[T](cleanFun)
def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
}
reduce(reducer) reduce(reducer)
} }
...@@ -160,11 +157,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { ...@@ -160,11 +157,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
throw new NullPointerException("Fold function must not be null.") throw new NullPointerException("Fold function must not be null.")
} }
val cleanFun = clean(function) val cleanFun = clean(function)
val folder = new FoldFunction[T,R] { val folder = new ScalaFoldFunction[T,R](cleanFun)
def fold(acc: R, v: T) = {
cleanFun(acc, v)
}
}
fold(initialValue, folder) fold(initialValue, folder)
} }
...@@ -183,11 +177,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { ...@@ -183,11 +177,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
function: AllWindowFunction[T, R, W]): DataStream[R] = { function: AllWindowFunction[T, R, W]): DataStream[R] = {
val cleanedFunction = clean(function) val cleanedFunction = clean(function)
val javaFunction = new JAllWindowFunction[T, R, W] { val javaFunction = new ScalaAllWindowFunctionWrapper[T, R, W](cleanedFunction)
def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
cleanedFunction(window, elements.asScala, out)
}
}
asScalaStream(javaStream.apply(javaFunction, implicitly[TypeInformation[R]])) asScalaStream(javaStream.apply(javaFunction, implicitly[TypeInformation[R]]))
} }
...@@ -206,11 +197,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { ...@@ -206,11 +197,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
val cleanedFunction = clean(function) val cleanedFunction = clean(function)
val applyFunction = new JAllWindowFunction[T, R, W] { val applyFunction = new ScalaAllWindowFunction[T, R, W](cleanedFunction)
def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
cleanedFunction(window, elements.asScala, out)
}
}
asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]])) asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
} }
...@@ -222,22 +210,20 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { ...@@ -222,22 +210,20 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* Arriving data is pre-aggregated using the given pre-aggregation reducer. * Arriving data is pre-aggregated using the given pre-aggregation reducer.
* *
* @param preAggregator The reduce function that is used for pre-aggregation * @param preAggregator The reduce function that is used for pre-aggregation
* @param function The window function. * @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window. * @return The data stream that is the result of applying the window function to the window.
*/ */
def apply[R: TypeInformation]( def apply[R: TypeInformation](
preAggregator: ReduceFunction[T], preAggregator: ReduceFunction[T],
function: AllWindowFunction[T, R, W]): DataStream[R] = { windowFunction: AllWindowFunction[T, R, W]): DataStream[R] = {
val cleanedFunction = clean(function) val cleanedReducer = clean(preAggregator)
val applyFunction = new JAllWindowFunction[T, R, W] { val cleanedWindowFunction = clean(windowFunction)
def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
cleanedFunction(window, elements.asScala, out) val applyFunction = new ScalaAllWindowFunctionWrapper[T, R, W](cleanedWindowFunction)
}
}
val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, returnType)) asScalaStream(javaStream.apply(cleanedReducer, applyFunction, returnType))
} }
/** /**
...@@ -248,30 +234,25 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { ...@@ -248,30 +234,25 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* Arriving data is pre-aggregated using the given pre-aggregation reducer. * Arriving data is pre-aggregated using the given pre-aggregation reducer.
* *
* @param preAggregator The reduce function that is used for pre-aggregation * @param preAggregator The reduce function that is used for pre-aggregation
* @param function The window function. * @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window. * @return The data stream that is the result of applying the window function to the window.
*/ */
def apply[R: TypeInformation]( def apply[R: TypeInformation](
preAggregator: (T, T) => T, preAggregator: (T, T) => T,
function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { windowFunction: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
if (function == null) {
if (preAggregator == null) {
throw new NullPointerException("Reduce function must not be null.") throw new NullPointerException("Reduce function must not be null.")
} }
if (function == null) { if (windowFunction == null) {
throw new NullPointerException("WindowApply function must not be null.") throw new NullPointerException("WindowApply function must not be null.")
} }
val cleanReducer = clean(preAggregator) val cleanReducer = clean(preAggregator)
val reducer = new ReduceFunction[T] { val cleanWindowFunction = clean(windowFunction)
def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
} val reducer = new ScalaReduceFunction[T](cleanReducer)
val applyFunction = new ScalaAllWindowFunction[T, R, W](cleanWindowFunction)
val cleanApply = clean(function)
val applyFunction = new JAllWindowFunction[T, R, W] {
def apply(window: W, input: java.lang.Iterable[T], out: Collector[R]): Unit = {
cleanApply(window, input.asScala, out)
}
}
val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.apply(reducer, applyFunction, returnType)) asScalaStream(javaStream.apply(reducer, applyFunction, returnType))
...@@ -286,24 +267,22 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { ...@@ -286,24 +267,22 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* *
* @param initialValue Initial value of the fold * @param initialValue Initial value of the fold
* @param preAggregator The reduce function that is used for pre-aggregation * @param preAggregator The reduce function that is used for pre-aggregation
* @param function The window function. * @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window. * @return The data stream that is the result of applying the window function to the window.
*/ */
def apply[R: TypeInformation]( def apply[R: TypeInformation](
initialValue: R, initialValue: R,
preAggregator: FoldFunction[T, R], preAggregator: FoldFunction[T, R],
function: AllWindowFunction[R, R, W]): DataStream[R] = { windowFunction: AllWindowFunction[R, R, W]): DataStream[R] = {
val cleanedFunction = clean(function) val cleanFolder = clean(preAggregator)
val applyFunction = new JAllWindowFunction[R, R, W] { val cleanWindowFunction = clean(windowFunction)
def apply(window: W, elements: java.lang.Iterable[R], out: Collector[R]): Unit = {
cleanedFunction(window, elements.asScala, out) val applyFunction = new ScalaAllWindowFunctionWrapper[R, R, W](cleanWindowFunction)
}
}
asScalaStream(javaStream.apply( asScalaStream(javaStream.apply(
initialValue, initialValue,
clean(preAggregator), cleanFolder,
applyFunction, applyFunction,
implicitly[TypeInformation[R]])) implicitly[TypeInformation[R]]))
} }
...@@ -317,31 +296,27 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { ...@@ -317,31 +296,27 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* *
* @param initialValue Initial value of the fold * @param initialValue Initial value of the fold
* @param preAggregator The reduce function that is used for pre-aggregation * @param preAggregator The reduce function that is used for pre-aggregation
* @param function The window function. * @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window. * @return The data stream that is the result of applying the window function to the window.
*/ */
def apply[R: TypeInformation]( def apply[R: TypeInformation](
initialValue: R, initialValue: R,
preAggregator: (R, T) => R, preAggregator: (R, T) => R,
function: (W, Iterable[R], Collector[R]) => Unit): DataStream[R] = { windowFunction: (W, Iterable[R], Collector[R]) => Unit): DataStream[R] = {
if (function == null) {
if (preAggregator == null) {
throw new NullPointerException("Reduce function must not be null.") throw new NullPointerException("Reduce function must not be null.")
} }
if (function == null) { if (windowFunction == null) {
throw new NullPointerException("WindowApply function must not be null.") throw new NullPointerException("WindowApply function must not be null.")
} }
val cleanFolder = clean(preAggregator) val cleanFolder = clean(preAggregator)
val folder = new FoldFunction[T, R] { val cleanWindowFunction = clean(windowFunction)
def fold(v1: R, v2: T) = { cleanFolder(v1, v2) }
} val folder = new ScalaFoldFunction[T, R](cleanFolder)
val applyFunction = new ScalaAllWindowFunction[R, R, W](cleanWindowFunction)
val cleanApply = clean(function)
val applyFunction = new JAllWindowFunction[R, R, W] {
def apply(window: W, input: java.lang.Iterable[R], out: Collector[R]): Unit = {
cleanApply(window, input.asScala, out)
}
}
val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.apply(initialValue, folder, applyFunction, returnType)) asScalaStream(javaStream.apply(initialValue, folder, applyFunction, returnType))
} }
......
...@@ -25,14 +25,12 @@ import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream} ...@@ -25,14 +25,12 @@ import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream}
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
import org.apache.flink.streaming.api.scala.function.WindowFunction import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.functions.windowing.{WindowFunction => JWindowFunction} import org.apache.flink.streaming.api.scala.function.util.{ScalaFoldFunction, ScalaReduceFunction, ScalaWindowFunction, ScalaWindowFunctionWrapper}
import org.apache.flink.streaming.api.windowing.evictors.Evictor import org.apache.flink.streaming.api.windowing.evictors.Evictor
import org.apache.flink.streaming.api.windowing.triggers.Trigger import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
/** /**
* A [[WindowedStream]] represents a data stream where elements are grouped by * A [[WindowedStream]] represents a data stream where elements are grouped by
* key, and for each key, the stream of elements is split into windows based on a * key, and for each key, the stream of elements is split into windows based on a
...@@ -124,9 +122,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { ...@@ -124,9 +122,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
throw new NullPointerException("Reduce function must not be null.") throw new NullPointerException("Reduce function must not be null.")
} }
val cleanFun = clean(function) val cleanFun = clean(function)
val reducer = new ReduceFunction[T] { val reducer = new ScalaReduceFunction[T](cleanFun)
def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
}
reduce(reducer) reduce(reducer)
} }
...@@ -163,11 +159,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { ...@@ -163,11 +159,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
throw new NullPointerException("Fold function must not be null.") throw new NullPointerException("Fold function must not be null.")
} }
val cleanFun = clean(function) val cleanFun = clean(function)
val folder = new FoldFunction[T,R] { val folder = new ScalaFoldFunction[T, R](cleanFun)
def fold(acc: R, v: T) = {
cleanFun(acc, v)
}
}
fold(initialValue, folder) fold(initialValue, folder)
} }
...@@ -186,13 +178,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { ...@@ -186,13 +178,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
function: WindowFunction[T, R, K, W]): DataStream[R] = { function: WindowFunction[T, R, K, W]): DataStream[R] = {
val cleanFunction = clean(function) val cleanFunction = clean(function)
val javaFunction = new JWindowFunction[T, R, K, W] { val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, W](cleanFunction)
def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]) = { asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
cleanFunction.apply(key, window, input.asScala, out)
}
}
asScalaStream(javaStream.apply(javaFunction, implicitly[TypeInformation[R]]))
} }
/** /**
...@@ -213,11 +200,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { ...@@ -213,11 +200,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
} }
val cleanedFunction = clean(function) val cleanedFunction = clean(function)
val applyFunction = new JWindowFunction[T, R, K, W] { val applyFunction = new ScalaWindowFunction[T, R, K, W](cleanedFunction)
def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
cleanedFunction(key, window, elements.asScala, out)
}
}
asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]])) asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
} }
...@@ -236,16 +220,13 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { ...@@ -236,16 +220,13 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
preAggregator: ReduceFunction[T], preAggregator: ReduceFunction[T],
function: WindowFunction[T, R, K, W]): DataStream[R] = { function: WindowFunction[T, R, K, W]): DataStream[R] = {
val cleanedFunction = clean(function) val cleanedPreAggregator = clean(preAggregator)
val cleanedWindowFunction = clean(function)
val applyFunction = new JWindowFunction[T, R, K, W] { val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction)
def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
cleanedFunction.apply(key, window, elements.asScala, out)
}
}
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, resultType)) asScalaStream(javaStream.apply(cleanedPreAggregator, applyFunction, resultType))
} }
/** /**
...@@ -256,31 +237,25 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { ...@@ -256,31 +237,25 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* Arriving data is pre-aggregated using the given pre-aggregation reducer. * Arriving data is pre-aggregated using the given pre-aggregation reducer.
* *
* @param preAggregator The reduce function that is used for pre-aggregation * @param preAggregator The reduce function that is used for pre-aggregation
* @param function The window function. * @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window. * @return The data stream that is the result of applying the window function to the window.
*/ */
def apply[R: TypeInformation]( def apply[R: TypeInformation](
preAggregator: (T, T) => T, preAggregator: (T, T) => T,
function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { windowFunction: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
if (function == null) { if (preAggregator == null) {
throw new NullPointerException("Reduce function must not be null.") throw new NullPointerException("Reduce function must not be null.")
} }
if (function == null) { if (windowFunction == null) {
throw new NullPointerException("WindowApply function must not be null.") throw new NullPointerException("WindowApply function must not be null.")
} }
val cleanReducer = clean(preAggregator) val cleanReducer = clean(preAggregator)
val reducer = new ReduceFunction[T] { val cleanWindowFunction = clean(windowFunction)
def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) }
} val reducer = new ScalaReduceFunction[T](cleanReducer)
val applyFunction = new ScalaWindowFunction[T, R, K, W](cleanWindowFunction)
val cleanApply = clean(function)
val applyFunction = new JWindowFunction[T, R, K, W] {
def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]): Unit = {
cleanApply(key, window, input.asScala, out)
}
}
asScalaStream(javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])) asScalaStream(javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]]))
} }
...@@ -303,16 +278,13 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { ...@@ -303,16 +278,13 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
function: WindowFunction[R, R, K, W]): DataStream[R] = { function: WindowFunction[R, R, K, W]): DataStream[R] = {
val cleanedFunction = clean(function) val cleanedFunction = clean(function)
val cleanedFoldFunction = clean(foldFunction)
val applyFunction = new JWindowFunction[R, R, K, W] { val applyFunction = new ScalaWindowFunctionWrapper[R, R, K, W](cleanedFunction)
def apply(key: K, window: W, elements: java.lang.Iterable[R], out: Collector[R]): Unit = {
cleanedFunction.apply(key, window, elements.asScala, out)
}
}
asScalaStream(javaStream.apply( asScalaStream(javaStream.apply(
initialValue, initialValue,
clean(foldFunction), cleanedFoldFunction,
applyFunction, applyFunction,
implicitly[TypeInformation[R]])) implicitly[TypeInformation[R]]))
} }
...@@ -325,32 +297,27 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { ...@@ -325,32 +297,27 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* Arriving data is incrementally aggregated using the given fold function. * Arriving data is incrementally aggregated using the given fold function.
* *
* @param foldFunction The fold function that is used for incremental aggregation * @param foldFunction The fold function that is used for incremental aggregation
* @param function The window function. * @param windowFunction The window function.
* @return The data stream that is the result of applying the window function to the window. * @return The data stream that is the result of applying the window function to the window.
*/ */
def apply[R: TypeInformation]( def apply[R: TypeInformation](
initialValue: R, initialValue: R,
foldFunction: (R, T) => R, foldFunction: (R, T) => R,
function: (K, W, Iterable[R], Collector[R]) => Unit): DataStream[R] = { windowFunction: (K, W, Iterable[R], Collector[R]) => Unit): DataStream[R] = {
if (function == null) { if (foldFunction == null) {
throw new NullPointerException("Fold function must not be null.") throw new NullPointerException("Fold function must not be null.")
} }
if (function == null) { if (windowFunction == null) {
throw new NullPointerException("WindowApply function must not be null.") throw new NullPointerException("WindowApply function must not be null.")
} }
val cleanFolder = clean(foldFunction) val cleanFolder = clean(foldFunction)
val folder = new FoldFunction[T, R] { val cleanWindowFunction = clean(windowFunction)
def fold(acc: R, v: T) = { cleanFolder(acc, v) }
} val folder = new ScalaFoldFunction[T, R](cleanFolder)
val applyFunction = new ScalaWindowFunction[R, R, K, W](cleanWindowFunction)
val cleanApply = clean(function)
val applyFunction = new JWindowFunction[R, R, K, W] {
def apply(key: K, window: W, input: java.lang.Iterable[R], out: Collector[R]): Unit = {
cleanApply(key, window, input.asScala, out)
}
}
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.apply(initialValue, folder, applyFunction, resultType)) asScalaStream(javaStream.apply(initialValue, folder, applyFunction, resultType))
} }
......
...@@ -25,11 +25,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window ...@@ -25,11 +25,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector import org.apache.flink.util.Collector
/** /**
* Base interface for functions that are evaluated over keyed (grouped) windows. * Base interface for functions that are evaluated over non-grouped windows,
* * i.e., windows over all stream partitions.
* @tparam IN The type of the input value. *
* @tparam OUT The type of the output value. * @tparam IN The type of the input value.
*/ * @tparam OUT The type of the output value.
*/
@Public @Public
trait AllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable { trait AllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala.function
import org.apache.flink.api.common.functions.AbstractRichFunction
import org.apache.flink.streaming.api.windowing.windows.Window
/**
* Rich variant of the [[org.apache.flink.streaming.api.scala.function.AllWindowFunction]].
*
* As a [[org.apache.flink.api.common.functions.RichFunction]], it gives access to the
* [[org.apache.flink.api.common.functions.RuntimeContext]] and provides setup
* and tear-down methods.
*
* @tparam IN The type of the input value.
* @tparam OUT The type of the output value.
* @tparam W The type of Window that this window function can be applied on.
*/
abstract class RichAllWindowFunction[IN, OUT, W <: Window]
extends AbstractRichFunction
with AllWindowFunction[IN, OUT, W] {}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala.function
import org.apache.flink.api.common.functions.AbstractRichFunction
import org.apache.flink.streaming.api.windowing.windows.Window
/**
* Rich variant of the [[org.apache.flink.streaming.api.scala.function.WindowFunction]].
*
* As a [[org.apache.flink.api.common.functions.RichFunction]], it gives access to the
* [[org.apache.flink.api.common.functions.RuntimeContext]] and provides setup
* and tear-down methods.
*
* @tparam IN The type of the input value.
* @tparam OUT The type of the output value.
* @tparam KEY The type of the key.
* @tparam W The type of Window that this window function can be applied on.
*/
abstract class RichWindowFunction[IN, OUT, KEY, W <: Window]
extends AbstractRichFunction
with WindowFunction[IN, OUT, KEY, W] {}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala.function.util
import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction => JAllWindowFunction}
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
/**
* A wrapper function that exposes a Scala Function3 as a Java AllWindowFunction.
*/
final class ScalaAllWindowFunction[IN, OUT, W <: Window](
private[this] val function: (W, Iterable[IN], Collector[OUT]) => Unit)
extends JAllWindowFunction[IN, OUT, W] {
@throws(classOf[Exception])
override def apply(window: W, input: java.lang.Iterable[IN], out: Collector[OUT]) {
function.apply(window, input.asScala, out)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala.function.util
import org.apache.flink.api.common.functions.util.FunctionUtils
import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction => JAllWindowFunction}
import org.apache.flink.streaming.api.scala.function.AllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
/**
* A wrapper function that exposes a Scala WindowFunction as a JavaWindow function.
*
* The Scala and Java Window functions differ in their type of "Iterable":
* - Scala WindowFunction: scala.Iterable
* - Java WindowFunction: java.lang.Iterable
*/
final class ScalaAllWindowFunctionWrapper[IN, OUT, W <: Window](
private[this] val func: AllWindowFunction[IN, OUT, W])
extends JAllWindowFunction[IN, OUT, W] with RichFunction {
@throws(classOf[Exception])
override def apply(window: W, input: java.lang.Iterable[IN], out: Collector[OUT]) {
func.apply(window, input.asScala, out)
}
@throws(classOf[Exception])
override def open(parameters: Configuration) {
FunctionUtils.openFunction(func, parameters)
}
@throws(classOf[Exception])
override def close() {
FunctionUtils.closeFunction(func)
}
override def setRuntimeContext(t: RuntimeContext) {
FunctionUtils.setFunctionRuntimeContext(func, t)
}
override def getRuntimeContext(): RuntimeContext = {
throw new RuntimeException("This should never be called")
}
override def getIterationRuntimeContext(): IterationRuntimeContext = {
throw new RuntimeException("This should never be called")
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala.function.util
import org.apache.flink.api.common.functions.FoldFunction
/**
* A wrapper function that exposes a Scala Function2 as a [[FoldFunction]].
*/
final class ScalaFoldFunction[T, R](private[this] val foldFunction: (R, T) => R)
extends FoldFunction[T, R] {
@throws(classOf[Exception])
override def fold(accumulator: R, value: T): R = {
foldFunction(accumulator, value)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala.function.util
import org.apache.flink.api.common.functions.ReduceFunction
/**
* A wrapper function that exposes a Scala Function2 as a [[ReduceFunction]].
*/
final class ScalaReduceFunction[T](private[this] val function: (T, T) => T)
extends ReduceFunction[T] {
@throws(classOf[Exception])
override def reduce(a: T, b: T): T = {
function(a, b)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala.function.util
import org.apache.flink.streaming.api.functions.windowing.{WindowFunction => JWindowFunction}
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
/**
* A wrapper function that exposes a Scala Function4 as a Java WindowFunction.
*/
final class ScalaWindowFunction[IN, OUT, KEY, W <: Window](
private[this] val function: (KEY, W, Iterable[IN], Collector[OUT]) => Unit)
extends JWindowFunction[IN, OUT, KEY, W] {
@throws(classOf[Exception])
override def apply(key: KEY, window: W, input: java.lang.Iterable[IN], out: Collector[OUT]) {
function.apply(key, window, input.asScala, out)
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala.function.util
import org.apache.flink.api.common.functions.{IterationRuntimeContext, RuntimeContext, RichFunction}
import org.apache.flink.api.common.functions.util.FunctionUtils
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.{ WindowFunction => JWindowFunction }
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
/**
* A wrapper function that exposes a Scala WindowFunction as a JavaWindow function.
*
* The Scala and Java Window functions differ in their type of "Iterable":
* - Scala WindowFunction: scala.Iterable
* - Java WindowFunction: java.lang.Iterable
*/
final class ScalaWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
private[this] val func: WindowFunction[IN, OUT, KEY, W])
extends JWindowFunction[IN, OUT, KEY, W] with RichFunction {
@throws(classOf[Exception])
override def apply(key: KEY, window: W, input: java.lang.Iterable[IN], out: Collector[OUT]) {
func.apply(key, window, input.asScala, out)
}
@throws(classOf[Exception])
override def open(parameters: Configuration) {
FunctionUtils.openFunction(func, parameters)
}
@throws(classOf[Exception])
override def close() {
FunctionUtils.closeFunction(func)
}
override def setRuntimeContext(t: RuntimeContext) {
FunctionUtils.setFunctionRuntimeContext(func, t)
}
override def getRuntimeContext(): RuntimeContext = {
throw new RuntimeException("This should never be called")
}
override def getIterationRuntimeContext(): IterationRuntimeContext = {
throw new RuntimeException("This should never be called")
}
}
...@@ -20,13 +20,17 @@ package org.apache.flink.streaming.api.scala ...@@ -20,13 +20,17 @@ package org.apache.flink.streaming.api.scala
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import org.apache.flink.api.common.functions.FoldFunction
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks} import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichWindowFunction, CheckingIdentityRichAllWindowFunction}
import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.junit.Test import org.junit.Test
import org.junit.Assert._ import org.junit.Assert._
...@@ -87,9 +91,68 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { ...@@ -87,9 +91,68 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
} }
@Test @Test
def testFoldAllWindow(): Unit = { def testFoldWithWindowFunction(): Unit = {
WindowFoldITCase.testResults = mutable.MutableList() WindowFoldITCase.testResults = mutable.MutableList()
CheckingIdentityRichWindowFunction.reset()
val foldFunc = new FoldFunction[(String, Int), (String, Int)] {
override def fold(accumulator: (String, Int), value: (String, Int)): (String, Int) = {
(accumulator._1 + value._1, accumulator._2 + value._2)
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val source1 = env.addSource(new SourceFunction[(String, Int)]() {
def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
ctx.collect(("a", 0))
ctx.collect(("a", 1))
ctx.collect(("a", 2))
ctx.collect(("b", 3))
ctx.collect(("b", 4))
ctx.collect(("b", 5))
ctx.collect(("a", 6))
ctx.collect(("a", 7))
ctx.collect(("a", 8))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {
}
}).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
source1
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.apply(
("R:", 0),
foldFunc,
new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]())
.addSink(new SinkFunction[(String, Int)]() {
def invoke(value: (String, Int)) {
WindowFoldITCase.testResults += value.toString
}
})
env.execute("Fold Window Test")
val expectedResult = mutable.MutableList(
"(R:aaa,3)",
"(R:aaa,21)",
"(R:bbb,12)")
assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
CheckingIdentityRichWindowFunction.checkRichMethodCalls()
}
@Test
def testFoldAllWindow(): Unit = {
WindowFoldITCase.testResults = mutable.MutableList()
val env = StreamExecutionEnvironment.getExecutionEnvironment val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1) env.setParallelism(1)
...@@ -131,6 +194,62 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { ...@@ -131,6 +194,62 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted) assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
} }
@Test
def testFoldAllWithWindowFunction(): Unit = {
WindowFoldITCase.testResults = mutable.MutableList()
CheckingIdentityRichAllWindowFunction.reset()
val foldFunc = new FoldFunction[(String, Int), (String, Int)] {
override def fold(accumulator: (String, Int), value: (String, Int)): (String, Int) = {
(accumulator._1 + value._1, accumulator._2 + value._2)
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val source1 = env.addSource(new SourceFunction[(String, Int)]() {
def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
ctx.collect(("a", 0))
ctx.collect(("a", 1))
ctx.collect(("a", 2))
ctx.collect(("b", 3))
ctx.collect(("a", 3))
ctx.collect(("b", 4))
ctx.collect(("a", 4))
ctx.collect(("b", 5))
ctx.collect(("a", 5))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {
}
}).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor)
source1
.windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.apply(
("R:", 0),
foldFunc,
new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]())
.addSink(new SinkFunction[(String, Int)]() {
def invoke(value: (String, Int)) {
WindowFoldITCase.testResults += value.toString
}
})
env.execute("Fold All-Window Test")
val expectedResult = mutable.MutableList(
"(R:aaa,3)",
"(R:bababa,24)")
assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted)
CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
}
} }
......
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala
import java.util.concurrent.TimeUnit
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.junit.Assert._
import org.junit.Test
import scala.collection.mutable
class WindowFunctionITCase {
@Test
def testRichWindowFunction(): Unit = {
WindowFunctionITCase.testResults = mutable.MutableList()
CheckingIdentityRichWindowFunction.reset()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val source1 = env.addSource(new SourceFunction[(String, Int)]() {
def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
ctx.collect(("a", 0))
ctx.collect(("a", 1))
ctx.collect(("a", 2))
ctx.collect(("b", 3))
ctx.collect(("b", 4))
ctx.collect(("b", 5))
ctx.collect(("a", 6))
ctx.collect(("a", 7))
ctx.collect(("a", 8))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {}
}).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor)
source1
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.apply(new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]())
.addSink(new SinkFunction[(String, Int)]() {
def invoke(value: (String, Int)) {
WindowFunctionITCase.testResults += value.toString
}
})
env.execute("RichWindowFunction Test")
val expectedResult = mutable.MutableList(
"(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)",
"(b,3)", "(b,4)", "(b,5)")
assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted)
CheckingIdentityRichWindowFunction.checkRichMethodCalls()
}
@Test
def testRichAllWindowFunction(): Unit = {
WindowFunctionITCase.testResults = mutable.MutableList()
CheckingIdentityRichAllWindowFunction.reset()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val source1 = env.addSource(new SourceFunction[(String, Int)]() {
def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
ctx.collect(("a", 0))
ctx.collect(("a", 1))
ctx.collect(("a", 2))
ctx.collect(("b", 3))
ctx.collect(("b", 4))
ctx.collect(("b", 5))
ctx.collect(("a", 6))
ctx.collect(("a", 7))
ctx.collect(("a", 8))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {}
}).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor)
source1
.windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.apply(new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]())
.addSink(new SinkFunction[(String, Int)]() {
def invoke(value: (String, Int)) {
WindowFunctionITCase.testResults += value.toString
}
})
env.execute("RichAllWindowFunction Test")
val expectedResult = mutable.MutableList(
"(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)",
"(b,3)", "(b,4)", "(b,5)")
assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted)
CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
}
}
object WindowFunctionITCase {
private var testResults: mutable.MutableList[String] = null
private class Tuple2TimestampExtractor extends AssignerWithPunctuatedWatermarks[(String, Int)] {
private var currentTimestamp = -1L
override def extractTimestamp(element: (String, Int), previousTimestamp: Long): Long = {
currentTimestamp = element._2
currentTimestamp
}
def checkAndGetNextWatermark(
lastElement: (String, Int),
extractedTimestamp: Long): Watermark = {
new Watermark(lastElement._2 - 1)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala
import java.util.concurrent.TimeUnit
import org.apache.flink.api.common.functions.{ReduceFunction, FoldFunction}
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.junit.Assert._
import org.junit.Test
import scala.collection.mutable
/**
* Tests for Folds over windows. These also test whether OutputTypeConfigurable functions
* work for windows, because FoldWindowFunction is OutputTypeConfigurable.
*/
class WindowReduceITCase extends StreamingMultipleProgramsTestBase {
@Test
def testReduceWindow(): Unit = {
WindowReduceITCase.testResults = mutable.MutableList()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val source1 = env.addSource(new SourceFunction[(String, Int)]() {
def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
ctx.collect(("a", 0))
ctx.collect(("a", 1))
ctx.collect(("a", 2))
ctx.collect(("b", 3))
ctx.collect(("b", 4))
ctx.collect(("b", 5))
ctx.collect(("a", 6))
ctx.collect(("a", 7))
ctx.collect(("a", 8))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {
}
}).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
source1
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.reduce( (a, b) => (a._1 + b._1, a._2 + b._2) )
.addSink(new SinkFunction[(String, Int)]() {
def invoke(value: (String, Int)) {
WindowReduceITCase.testResults += value.toString
}
})
env.execute("Reduce Window Test")
val expectedResult = mutable.MutableList(
"(aaa,3)",
"(aaa,21)",
"(bbb,12)")
assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
}
@Test
def testReduceWithWindowFunction(): Unit = {
WindowReduceITCase.testResults = mutable.MutableList()
CheckingIdentityRichWindowFunction.reset()
val reduceFunc = new ReduceFunction[(String, Int)] {
override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = {
(a._1 + b._1, a._2 + b._2)
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val source1 = env.addSource(new SourceFunction[(String, Int)]() {
def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
ctx.collect(("a", 0))
ctx.collect(("a", 1))
ctx.collect(("a", 2))
ctx.collect(("b", 3))
ctx.collect(("b", 4))
ctx.collect(("b", 5))
ctx.collect(("a", 6))
ctx.collect(("a", 7))
ctx.collect(("a", 8))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {
}
}).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
source1
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.apply(
reduceFunc,
new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]())
.addSink(new SinkFunction[(String, Int)]() {
def invoke(value: (String, Int)) {
WindowReduceITCase.testResults += value.toString
}
})
env.execute("Reduce Window Test")
val expectedResult = mutable.MutableList(
"(aaa,3)",
"(aaa,21)",
"(bbb,12)")
assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
CheckingIdentityRichWindowFunction.checkRichMethodCalls()
}
@Test
def testReduceAllWindow(): Unit = {
WindowReduceITCase.testResults = mutable.MutableList()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val source1 = env.addSource(new SourceFunction[(String, Int)]() {
def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
ctx.collect(("a", 0))
ctx.collect(("a", 1))
ctx.collect(("a", 2))
ctx.collect(("b", 3))
ctx.collect(("a", 3))
ctx.collect(("b", 4))
ctx.collect(("a", 4))
ctx.collect(("b", 5))
ctx.collect(("a", 5))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {
}
}).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
source1
.windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.reduce( (a, b) => (a._1 + b._1, a._2 + b._2) )
.addSink(new SinkFunction[(String, Int)]() {
def invoke(value: (String, Int)) {
WindowReduceITCase.testResults += value.toString
}
})
env.execute("Fold All-Window Test")
val expectedResult = mutable.MutableList(
"(aaa,3)",
"(bababa,24)")
assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
}
@Test
def testReduceAllWithWindowFunction(): Unit = {
WindowReduceITCase.testResults = mutable.MutableList()
CheckingIdentityRichAllWindowFunction.reset()
val reduceFunc = new ReduceFunction[(String, Int)] {
override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = {
(a._1 + b._1, a._2 + b._2)
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val source1 = env.addSource(new SourceFunction[(String, Int)]() {
def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
ctx.collect(("a", 0))
ctx.collect(("a", 1))
ctx.collect(("a", 2))
ctx.collect(("b", 3))
ctx.collect(("a", 3))
ctx.collect(("b", 4))
ctx.collect(("a", 4))
ctx.collect(("b", 5))
ctx.collect(("a", 5))
// source is finite, so it will have an implicit MAX watermark when it finishes
}
def cancel() {
}
}).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor)
source1
.windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.apply(
reduceFunc,
new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]())
.addSink(new SinkFunction[(String, Int)]() {
def invoke(value: (String, Int)) {
WindowReduceITCase.testResults += value.toString
}
})
env.execute("Fold All-Window Test")
val expectedResult = mutable.MutableList(
"(aaa,3)",
"(bababa,24)")
assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted)
CheckingIdentityRichAllWindowFunction.checkRichMethodCalls()
}
}
object WindowReduceITCase {
private var testResults: mutable.MutableList[String] = null
private class Tuple2TimestampExtractor extends AssignerWithPunctuatedWatermarks[(String, Int)] {
private var currentTimestamp = -1L
override def extractTimestamp(element: (String, Int), previousTimestamp: Long): Long = {
currentTimestamp = element._2
currentTimestamp
}
def checkAndGetNextWatermark(
lastElement: (String, Int),
extractedTimestamp: Long): Watermark = {
new Watermark(lastElement._2 - 1)
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala.testutils
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.function.RichAllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
class CheckingIdentityRichAllWindowFunction[T, W <: Window]
extends RichAllWindowFunction[T, T, W] {
override def apply(window: W, input: scala.Iterable[T], out: Collector[T]): Unit = {
for (value <- input) {
out.collect(value)
}
}
override def open(conf: Configuration): Unit = {
super.open(conf)
CheckingIdentityRichAllWindowFunction.openCalled = true
}
override def close(): Unit = {
super.close()
CheckingIdentityRichAllWindowFunction.closeCalled = true
}
override def setRuntimeContext(context: RuntimeContext): Unit = {
super.setRuntimeContext(context)
CheckingIdentityRichAllWindowFunction.contextSet = true
}
}
object CheckingIdentityRichAllWindowFunction {
@volatile
private[CheckingIdentityRichAllWindowFunction] var closeCalled = false
@volatile
private[CheckingIdentityRichAllWindowFunction] var openCalled = false
@volatile
private[CheckingIdentityRichAllWindowFunction] var contextSet = false
def reset(): Unit = {
closeCalled = false
openCalled = false
contextSet = false
}
def checkRichMethodCalls(): Unit = {
if (!contextSet) {
throw new AssertionError("context not set")
}
if (!openCalled) {
throw new AssertionError("open() not called")
}
if (!closeCalled) {
throw new AssertionError("close() not called")
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.scala.testutils
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.function.RichWindowFunction
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
class CheckingIdentityRichWindowFunction[T, K, W <: Window]
extends RichWindowFunction[T, T, K, W] {
override def apply(key: K, window: W, input: scala.Iterable[T], out: Collector[T]): Unit = {
for (value <- input) {
out.collect(value)
}
}
override def open(conf: Configuration): Unit = {
super.open(conf)
CheckingIdentityRichWindowFunction.openCalled = true
}
override def close(): Unit = {
super.close()
CheckingIdentityRichWindowFunction.closeCalled = true
}
override def setRuntimeContext(context: RuntimeContext): Unit = {
super.setRuntimeContext(context)
CheckingIdentityRichWindowFunction.contextSet = true
}
}
object CheckingIdentityRichWindowFunction {
@volatile
private[CheckingIdentityRichWindowFunction] var closeCalled = false
@volatile
private[CheckingIdentityRichWindowFunction] var openCalled = false
@volatile
private[CheckingIdentityRichWindowFunction] var contextSet = false
def reset(): Unit = {
closeCalled = false
openCalled = false
contextSet = false
}
def checkRichMethodCalls(): Unit = {
if (!contextSet) {
throw new AssertionError("context not set")
}
if (!openCalled) {
throw new AssertionError("open() not called")
}
if (!closeCalled) {
throw new AssertionError("close() not called")
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册