From 7bdac1afa0e05d0c39b2521c3472c6b3eebb4f45 Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Fri, 4 Mar 2016 12:23:45 +0100 Subject: [PATCH] [FLINK-3578] [streaming scala] Support RichFunctions for Scala WindowFunction and AllWindowFunction This also consolidates the various wrapper classes for fold() and reduce() functions. This closes #1765 --- .../windowing/RichWindowFunction.java | 17 +- .../InternalIterableWindowFunction.java | 2 +- .../api/scala/AllWindowedStream.scala | 111 +++---- .../streaming/api/scala/WindowedStream.scala | 95 ++---- .../scala/function/AllWindowFunction.scala | 11 +- .../function/RichAllWindowFunction.scala | 37 +++ .../scala/function/RichWindowFunction.scala | 38 +++ .../util/ScalaAllWindowFunction.scala | 38 +++ .../util/ScalaAllWindowFunctionWrapper.scala | 68 +++++ .../function/util/ScalaFoldFunction.scala | 33 +++ .../function/util/ScalaReduceFunction.scala | 33 +++ .../function/util/ScalaWindowFunction.scala | 38 +++ .../util/ScalaWindowFunctionWrapper.scala | 68 +++++ .../api/scala/WindowFoldITCase.scala | 123 +++++++- .../api/scala/WindowFunctionITCase.scala | 157 ++++++++++ .../api/scala/WindowReduceITCase.scala | 276 ++++++++++++++++++ ...heckingIdentityRichAllWindowFunction.scala | 84 ++++++ .../CheckingIdentityRichWindowFunction.scala | 81 +++++ 18 files changed, 1168 insertions(+), 142 deletions(-) create mode 100644 flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichAllWindowFunction.scala create mode 100644 flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichWindowFunction.scala create mode 100644 flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunction.scala create mode 100644 flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunctionWrapper.scala create mode 100644 flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaFoldFunction.scala create mode 100644 flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaReduceFunction.scala create mode 100644 flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunction.scala create mode 100644 flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunctionWrapper.scala create mode 100644 flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala create mode 100644 flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala create mode 100644 flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichAllWindowFunction.scala create mode 100644 flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichWindowFunction.scala diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java index 18ad19a709d..2ed271f31dc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/RichWindowFunction.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,12 +15,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.functions.windowing; import org.apache.flink.annotation.Public; import org.apache.flink.api.common.functions.AbstractRichFunction; +import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.streaming.api.windowing.windows.Window; +/** + * Rich variant of the {@link WindowFunction}. As a {@link RichFunction}, it gives access to the + * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and tear-down methods: + * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and + * {@link RichFunction#close()}. + * + * @param The type of the input value. + * @param The type of the output value. + * @param The type of the key. + * @param The type of {@code Window} that this window function can be applied on. + */ @Public public abstract class RichWindowFunction extends AbstractRichFunction implements WindowFunction { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java index 32318eacd1f..7b441fb87fa 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalIterableWindowFunction.java @@ -33,7 +33,7 @@ import org.apache.flink.util.Collector; public final class InternalIterableWindowFunction extends InternalWindowFunction, OUT, KEY, W> implements RichFunction { private static final long serialVersionUID = 1L; - protected WindowFunction wrappedFunction; + protected final WindowFunction wrappedFunction; public InternalIterableWindowFunction(WindowFunction wrappedFunction) { this.wrappedFunction = wrappedFunction; diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index dcb98226450..020c619a0e2 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -24,15 +24,13 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWStream} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} -import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction => JAllWindowFunction} import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.scala.function.util.{ScalaAllWindowFunction, ScalaAllWindowFunctionWrapper, ScalaReduceFunction, ScalaFoldFunction} import org.apache.flink.streaming.api.windowing.evictors.Evictor import org.apache.flink.streaming.api.windowing.triggers.Trigger import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector -import scala.collection.JavaConverters._ - /** * A [[AllWindowedStream]] represents a data stream where the stream of * elements is split into windows based on a @@ -56,7 +54,7 @@ import scala.collection.JavaConverters._ */ @Public class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { - + /** * Sets the [[Trigger]] that should be used to trigger window emission. */ @@ -121,9 +119,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { throw new NullPointerException("Reduce function must not be null.") } val cleanFun = clean(function) - val reducer = new ReduceFunction[T] { - def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } - } + val reducer = new ScalaReduceFunction[T](cleanFun) + reduce(reducer) } @@ -160,11 +157,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { throw new NullPointerException("Fold function must not be null.") } val cleanFun = clean(function) - val folder = new FoldFunction[T,R] { - def fold(acc: R, v: T) = { - cleanFun(acc, v) - } - } + val folder = new ScalaFoldFunction[T,R](cleanFun) + fold(initialValue, folder) } @@ -183,11 +177,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { function: AllWindowFunction[T, R, W]): DataStream[R] = { val cleanedFunction = clean(function) - val javaFunction = new JAllWindowFunction[T, R, W] { - def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { - cleanedFunction(window, elements.asScala, out) - } - } + val javaFunction = new ScalaAllWindowFunctionWrapper[T, R, W](cleanedFunction) + asScalaStream(javaStream.apply(javaFunction, implicitly[TypeInformation[R]])) } @@ -206,11 +197,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { val cleanedFunction = clean(function) - val applyFunction = new JAllWindowFunction[T, R, W] { - def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { - cleanedFunction(window, elements.asScala, out) - } - } + val applyFunction = new ScalaAllWindowFunction[T, R, W](cleanedFunction) + asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]])) } @@ -222,22 +210,20 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * Arriving data is pre-aggregated using the given pre-aggregation reducer. * * @param preAggregator The reduce function that is used for pre-aggregation - * @param function The window function. + * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ def apply[R: TypeInformation]( preAggregator: ReduceFunction[T], - function: AllWindowFunction[T, R, W]): DataStream[R] = { + windowFunction: AllWindowFunction[T, R, W]): DataStream[R] = { - val cleanedFunction = clean(function) - val applyFunction = new JAllWindowFunction[T, R, W] { - def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { - cleanedFunction(window, elements.asScala, out) - } - } + val cleanedReducer = clean(preAggregator) + val cleanedWindowFunction = clean(windowFunction) + + val applyFunction = new ScalaAllWindowFunctionWrapper[T, R, W](cleanedWindowFunction) val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] - asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, returnType)) + asScalaStream(javaStream.apply(cleanedReducer, applyFunction, returnType)) } /** @@ -248,30 +234,25 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * Arriving data is pre-aggregated using the given pre-aggregation reducer. * * @param preAggregator The reduce function that is used for pre-aggregation - * @param function The window function. + * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ def apply[R: TypeInformation]( preAggregator: (T, T) => T, - function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { - if (function == null) { + windowFunction: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { + + if (preAggregator == null) { throw new NullPointerException("Reduce function must not be null.") } - if (function == null) { + if (windowFunction == null) { throw new NullPointerException("WindowApply function must not be null.") } val cleanReducer = clean(preAggregator) - val reducer = new ReduceFunction[T] { - def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) } - } - - val cleanApply = clean(function) - val applyFunction = new JAllWindowFunction[T, R, W] { - def apply(window: W, input: java.lang.Iterable[T], out: Collector[R]): Unit = { - cleanApply(window, input.asScala, out) - } - } + val cleanWindowFunction = clean(windowFunction) + + val reducer = new ScalaReduceFunction[T](cleanReducer) + val applyFunction = new ScalaAllWindowFunction[T, R, W](cleanWindowFunction) val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] asScalaStream(javaStream.apply(reducer, applyFunction, returnType)) @@ -286,24 +267,22 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * * @param initialValue Initial value of the fold * @param preAggregator The reduce function that is used for pre-aggregation - * @param function The window function. + * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ def apply[R: TypeInformation]( initialValue: R, preAggregator: FoldFunction[T, R], - function: AllWindowFunction[R, R, W]): DataStream[R] = { + windowFunction: AllWindowFunction[R, R, W]): DataStream[R] = { - val cleanedFunction = clean(function) - val applyFunction = new JAllWindowFunction[R, R, W] { - def apply(window: W, elements: java.lang.Iterable[R], out: Collector[R]): Unit = { - cleanedFunction(window, elements.asScala, out) - } - } + val cleanFolder = clean(preAggregator) + val cleanWindowFunction = clean(windowFunction) + + val applyFunction = new ScalaAllWindowFunctionWrapper[R, R, W](cleanWindowFunction) asScalaStream(javaStream.apply( initialValue, - clean(preAggregator), + cleanFolder, applyFunction, implicitly[TypeInformation[R]])) } @@ -317,31 +296,27 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * * @param initialValue Initial value of the fold * @param preAggregator The reduce function that is used for pre-aggregation - * @param function The window function. + * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ def apply[R: TypeInformation]( initialValue: R, preAggregator: (R, T) => R, - function: (W, Iterable[R], Collector[R]) => Unit): DataStream[R] = { - if (function == null) { + windowFunction: (W, Iterable[R], Collector[R]) => Unit): DataStream[R] = { + + if (preAggregator == null) { throw new NullPointerException("Reduce function must not be null.") } - if (function == null) { + if (windowFunction == null) { throw new NullPointerException("WindowApply function must not be null.") } val cleanFolder = clean(preAggregator) - val folder = new FoldFunction[T, R] { - def fold(v1: R, v2: T) = { cleanFolder(v1, v2) } - } - - val cleanApply = clean(function) - val applyFunction = new JAllWindowFunction[R, R, W] { - def apply(window: W, input: java.lang.Iterable[R], out: Collector[R]): Unit = { - cleanApply(window, input.asScala, out) - } - } + val cleanWindowFunction = clean(windowFunction) + + val folder = new ScalaFoldFunction[T, R](cleanFolder) + val applyFunction = new ScalaAllWindowFunction[R, R, W](cleanWindowFunction) + val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] asScalaStream(javaStream.apply(initialValue, folder, applyFunction, returnType)) } diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index bea578eadbc..773829e1f34 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -25,14 +25,12 @@ import org.apache.flink.streaming.api.datastream.{WindowedStream => JavaWStream} import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator} import org.apache.flink.streaming.api.scala.function.WindowFunction -import org.apache.flink.streaming.api.functions.windowing.{WindowFunction => JWindowFunction} +import org.apache.flink.streaming.api.scala.function.util.{ScalaFoldFunction, ScalaReduceFunction, ScalaWindowFunction, ScalaWindowFunctionWrapper} import org.apache.flink.streaming.api.windowing.evictors.Evictor import org.apache.flink.streaming.api.windowing.triggers.Trigger import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector -import scala.collection.JavaConverters._ - /** * A [[WindowedStream]] represents a data stream where elements are grouped by * key, and for each key, the stream of elements is split into windows based on a @@ -124,9 +122,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { throw new NullPointerException("Reduce function must not be null.") } val cleanFun = clean(function) - val reducer = new ReduceFunction[T] { - def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } - } + val reducer = new ScalaReduceFunction[T](cleanFun) reduce(reducer) } @@ -163,11 +159,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { throw new NullPointerException("Fold function must not be null.") } val cleanFun = clean(function) - val folder = new FoldFunction[T,R] { - def fold(acc: R, v: T) = { - cleanFun(acc, v) - } - } + val folder = new ScalaFoldFunction[T, R](cleanFun) fold(initialValue, folder) } @@ -186,13 +178,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { function: WindowFunction[T, R, K, W]): DataStream[R] = { val cleanFunction = clean(function) - val javaFunction = new JWindowFunction[T, R, K, W] { - def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]) = { - cleanFunction.apply(key, window, input.asScala, out) - } - } - - asScalaStream(javaStream.apply(javaFunction, implicitly[TypeInformation[R]])) + val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, W](cleanFunction) + asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]])) } /** @@ -213,11 +200,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { } val cleanedFunction = clean(function) - val applyFunction = new JWindowFunction[T, R, K, W] { - def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { - cleanedFunction(key, window, elements.asScala, out) - } - } + val applyFunction = new ScalaWindowFunction[T, R, K, W](cleanedFunction) + asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]])) } @@ -236,16 +220,13 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { preAggregator: ReduceFunction[T], function: WindowFunction[T, R, K, W]): DataStream[R] = { - val cleanedFunction = clean(function) + val cleanedPreAggregator = clean(preAggregator) + val cleanedWindowFunction = clean(function) - val applyFunction = new JWindowFunction[T, R, K, W] { - def apply(key: K, window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { - cleanedFunction.apply(key, window, elements.asScala, out) - } - } + val applyFunction = new ScalaWindowFunctionWrapper[T, R, K, W](cleanedWindowFunction) val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] - asScalaStream(javaStream.apply(clean(preAggregator), applyFunction, resultType)) + asScalaStream(javaStream.apply(cleanedPreAggregator, applyFunction, resultType)) } /** @@ -256,31 +237,25 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * Arriving data is pre-aggregated using the given pre-aggregation reducer. * * @param preAggregator The reduce function that is used for pre-aggregation - * @param function The window function. + * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ def apply[R: TypeInformation]( preAggregator: (T, T) => T, - function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { + windowFunction: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { - if (function == null) { + if (preAggregator == null) { throw new NullPointerException("Reduce function must not be null.") } - if (function == null) { + if (windowFunction == null) { throw new NullPointerException("WindowApply function must not be null.") } val cleanReducer = clean(preAggregator) - val reducer = new ReduceFunction[T] { - def reduce(v1: T, v2: T) = { cleanReducer(v1, v2) } - } - - val cleanApply = clean(function) - val applyFunction = new JWindowFunction[T, R, K, W] { - def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]): Unit = { - cleanApply(key, window, input.asScala, out) - } - } + val cleanWindowFunction = clean(windowFunction) + + val reducer = new ScalaReduceFunction[T](cleanReducer) + val applyFunction = new ScalaWindowFunction[T, R, K, W](cleanWindowFunction) asScalaStream(javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])) } @@ -303,16 +278,13 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { function: WindowFunction[R, R, K, W]): DataStream[R] = { val cleanedFunction = clean(function) + val cleanedFoldFunction = clean(foldFunction) - val applyFunction = new JWindowFunction[R, R, K, W] { - def apply(key: K, window: W, elements: java.lang.Iterable[R], out: Collector[R]): Unit = { - cleanedFunction.apply(key, window, elements.asScala, out) - } - } + val applyFunction = new ScalaWindowFunctionWrapper[R, R, K, W](cleanedFunction) asScalaStream(javaStream.apply( initialValue, - clean(foldFunction), + cleanedFoldFunction, applyFunction, implicitly[TypeInformation[R]])) } @@ -325,32 +297,27 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * Arriving data is incrementally aggregated using the given fold function. * * @param foldFunction The fold function that is used for incremental aggregation - * @param function The window function. + * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ def apply[R: TypeInformation]( initialValue: R, foldFunction: (R, T) => R, - function: (K, W, Iterable[R], Collector[R]) => Unit): DataStream[R] = { + windowFunction: (K, W, Iterable[R], Collector[R]) => Unit): DataStream[R] = { - if (function == null) { + if (foldFunction == null) { throw new NullPointerException("Fold function must not be null.") } - if (function == null) { + if (windowFunction == null) { throw new NullPointerException("WindowApply function must not be null.") } val cleanFolder = clean(foldFunction) - val folder = new FoldFunction[T, R] { - def fold(acc: R, v: T) = { cleanFolder(acc, v) } - } - - val cleanApply = clean(function) - val applyFunction = new JWindowFunction[R, R, K, W] { - def apply(key: K, window: W, input: java.lang.Iterable[R], out: Collector[R]): Unit = { - cleanApply(key, window, input.asScala, out) - } - } + val cleanWindowFunction = clean(windowFunction) + + val folder = new ScalaFoldFunction[T, R](cleanFolder) + val applyFunction = new ScalaWindowFunction[R, R, K, W](cleanWindowFunction) + val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] asScalaStream(javaStream.apply(initialValue, folder, applyFunction, resultType)) } diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala index 4e77d8366f0..7677459526e 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/AllWindowFunction.scala @@ -25,11 +25,12 @@ import org.apache.flink.streaming.api.windowing.windows.Window import org.apache.flink.util.Collector /** - * Base interface for functions that are evaluated over keyed (grouped) windows. - * - * @tparam IN The type of the input value. - * @tparam OUT The type of the output value. - */ + * Base interface for functions that are evaluated over non-grouped windows, + * i.e., windows over all stream partitions. + * + * @tparam IN The type of the input value. + * @tparam OUT The type of the output value. + */ @Public trait AllWindowFunction[IN, OUT, W <: Window] extends Function with Serializable { diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichAllWindowFunction.scala new file mode 100644 index 00000000000..4ff1fe9a93c --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichAllWindowFunction.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.function + +import org.apache.flink.api.common.functions.AbstractRichFunction +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * Rich variant of the [[org.apache.flink.streaming.api.scala.function.AllWindowFunction]]. + * + * As a [[org.apache.flink.api.common.functions.RichFunction]], it gives access to the + * [[org.apache.flink.api.common.functions.RuntimeContext]] and provides setup + * and tear-down methods. + * + * @tparam IN The type of the input value. + * @tparam OUT The type of the output value. + * @tparam W The type of Window that this window function can be applied on. + */ +abstract class RichAllWindowFunction[IN, OUT, W <: Window] + extends AbstractRichFunction + with AllWindowFunction[IN, OUT, W] {} diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichWindowFunction.scala new file mode 100644 index 00000000000..72d44e77867 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/RichWindowFunction.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.function + +import org.apache.flink.api.common.functions.AbstractRichFunction +import org.apache.flink.streaming.api.windowing.windows.Window + +/** + * Rich variant of the [[org.apache.flink.streaming.api.scala.function.WindowFunction]]. + * + * As a [[org.apache.flink.api.common.functions.RichFunction]], it gives access to the + * [[org.apache.flink.api.common.functions.RuntimeContext]] and provides setup + * and tear-down methods. + * + * @tparam IN The type of the input value. + * @tparam OUT The type of the output value. + * @tparam KEY The type of the key. + * @tparam W The type of Window that this window function can be applied on. + */ +abstract class RichWindowFunction[IN, OUT, KEY, W <: Window] + extends AbstractRichFunction + with WindowFunction[IN, OUT, KEY, W] {} diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunction.scala new file mode 100644 index 00000000000..943e83f597b --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunction.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.function.util + +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction => JAllWindowFunction} +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + +import scala.collection.JavaConverters._ + +/** + * A wrapper function that exposes a Scala Function3 as a Java AllWindowFunction. + */ +final class ScalaAllWindowFunction[IN, OUT, W <: Window]( + private[this] val function: (W, Iterable[IN], Collector[OUT]) => Unit) + extends JAllWindowFunction[IN, OUT, W] { + + @throws(classOf[Exception]) + override def apply(window: W, input: java.lang.Iterable[IN], out: Collector[OUT]) { + function.apply(window, input.asScala, out) + } +} diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunctionWrapper.scala new file mode 100644 index 00000000000..39142c298dd --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaAllWindowFunctionWrapper.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.function.util + +import org.apache.flink.api.common.functions.util.FunctionUtils +import org.apache.flink.api.common.functions.{IterationRuntimeContext, RichFunction, RuntimeContext} +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction => JAllWindowFunction} +import org.apache.flink.streaming.api.scala.function.AllWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + +import scala.collection.JavaConverters._ + +/** + * A wrapper function that exposes a Scala WindowFunction as a JavaWindow function. + * + * The Scala and Java Window functions differ in their type of "Iterable": + * - Scala WindowFunction: scala.Iterable + * - Java WindowFunction: java.lang.Iterable + */ +final class ScalaAllWindowFunctionWrapper[IN, OUT, W <: Window]( + private[this] val func: AllWindowFunction[IN, OUT, W]) + extends JAllWindowFunction[IN, OUT, W] with RichFunction { + + @throws(classOf[Exception]) + override def apply(window: W, input: java.lang.Iterable[IN], out: Collector[OUT]) { + func.apply(window, input.asScala, out) + } + + @throws(classOf[Exception]) + override def open(parameters: Configuration) { + FunctionUtils.openFunction(func, parameters) + } + + @throws(classOf[Exception]) + override def close() { + FunctionUtils.closeFunction(func) + } + + override def setRuntimeContext(t: RuntimeContext) { + FunctionUtils.setFunctionRuntimeContext(func, t) + } + + override def getRuntimeContext(): RuntimeContext = { + throw new RuntimeException("This should never be called") + } + + override def getIterationRuntimeContext(): IterationRuntimeContext = { + throw new RuntimeException("This should never be called") + } +} diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaFoldFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaFoldFunction.scala new file mode 100644 index 00000000000..f85ddd09953 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaFoldFunction.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.function.util + +import org.apache.flink.api.common.functions.FoldFunction + +/** + * A wrapper function that exposes a Scala Function2 as a [[FoldFunction]]. + */ +final class ScalaFoldFunction[T, R](private[this] val foldFunction: (R, T) => R) + extends FoldFunction[T, R] { + + @throws(classOf[Exception]) + override def fold(accumulator: R, value: T): R = { + foldFunction(accumulator, value) + } +} diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaReduceFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaReduceFunction.scala new file mode 100644 index 00000000000..027ccb257ec --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaReduceFunction.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.function.util + +import org.apache.flink.api.common.functions.ReduceFunction + +/** + * A wrapper function that exposes a Scala Function2 as a [[ReduceFunction]]. + */ +final class ScalaReduceFunction[T](private[this] val function: (T, T) => T) + extends ReduceFunction[T] { + + @throws(classOf[Exception]) + override def reduce(a: T, b: T): T = { + function(a, b) + } +} diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunction.scala new file mode 100644 index 00000000000..3b4cb9d459d --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunction.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.function.util + +import org.apache.flink.streaming.api.functions.windowing.{WindowFunction => JWindowFunction} +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + +import scala.collection.JavaConverters._ + +/** + * A wrapper function that exposes a Scala Function4 as a Java WindowFunction. + */ +final class ScalaWindowFunction[IN, OUT, KEY, W <: Window]( + private[this] val function: (KEY, W, Iterable[IN], Collector[OUT]) => Unit) + extends JWindowFunction[IN, OUT, KEY, W] { + + @throws(classOf[Exception]) + override def apply(key: KEY, window: W, input: java.lang.Iterable[IN], out: Collector[OUT]) { + function.apply(key, window, input.asScala, out) + } +} diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunctionWrapper.scala new file mode 100644 index 00000000000..1d74b6cf0b1 --- /dev/null +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaWindowFunctionWrapper.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.function.util + +import org.apache.flink.api.common.functions.{IterationRuntimeContext, RuntimeContext, RichFunction} +import org.apache.flink.api.common.functions.util.FunctionUtils +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.windowing.{ WindowFunction => JWindowFunction } +import org.apache.flink.streaming.api.scala.function.WindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + +import scala.collection.JavaConverters._ + +/** + * A wrapper function that exposes a Scala WindowFunction as a JavaWindow function. + * + * The Scala and Java Window functions differ in their type of "Iterable": + * - Scala WindowFunction: scala.Iterable + * - Java WindowFunction: java.lang.Iterable + */ +final class ScalaWindowFunctionWrapper[IN, OUT, KEY, W <: Window]( + private[this] val func: WindowFunction[IN, OUT, KEY, W]) + extends JWindowFunction[IN, OUT, KEY, W] with RichFunction { + + @throws(classOf[Exception]) + override def apply(key: KEY, window: W, input: java.lang.Iterable[IN], out: Collector[OUT]) { + func.apply(key, window, input.asScala, out) + } + + @throws(classOf[Exception]) + override def open(parameters: Configuration) { + FunctionUtils.openFunction(func, parameters) + } + + @throws(classOf[Exception]) + override def close() { + FunctionUtils.closeFunction(func) + } + + override def setRuntimeContext(t: RuntimeContext) { + FunctionUtils.setFunctionRuntimeContext(func, t) + } + + override def getRuntimeContext(): RuntimeContext = { + throw new RuntimeException("This should never be called") + } + + override def getIterationRuntimeContext(): IterationRuntimeContext = { + throw new RuntimeException("This should never be called") + } +} diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala index 78336515279..6a6a9562e31 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFoldITCase.scala @@ -20,13 +20,17 @@ package org.apache.flink.streaming.api.scala import java.util.concurrent.TimeUnit +import org.apache.flink.api.common.functions.FoldFunction +import org.apache.flink.api.java.tuple.Tuple import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.functions.{AssignerWithPunctuatedWatermarks} +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks import org.apache.flink.streaming.api.functions.sink.SinkFunction import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichWindowFunction, CheckingIdentityRichAllWindowFunction} import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase import org.junit.Test import org.junit.Assert._ @@ -87,9 +91,68 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { } @Test - def testFoldAllWindow(): Unit = { + def testFoldWithWindowFunction(): Unit = { WindowFoldITCase.testResults = mutable.MutableList() + CheckingIdentityRichWindowFunction.reset() + val foldFunc = new FoldFunction[(String, Int), (String, Int)] { + override def fold(accumulator: (String, Int), value: (String, Int)): (String, Int) = { + (accumulator._1 + value._1, accumulator._2 + value._2) + } + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("b", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 6)) + ctx.collect(("a", 7)) + ctx.collect(("a", 8)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() { + } + }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor) + + source1 + .keyBy(0) + .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply( + ("R:", 0), + foldFunc, + new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]()) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowFoldITCase.testResults += value.toString + } + }) + + env.execute("Fold Window Test") + + val expectedResult = mutable.MutableList( + "(R:aaa,3)", + "(R:aaa,21)", + "(R:bbb,12)") + + assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted) + + CheckingIdentityRichWindowFunction.checkRichMethodCalls() + } + + @Test + def testFoldAllWindow(): Unit = { + WindowFoldITCase.testResults = mutable.MutableList() + val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) env.setParallelism(1) @@ -131,6 +194,62 @@ class WindowFoldITCase extends StreamingMultipleProgramsTestBase { assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted) } + @Test + def testFoldAllWithWindowFunction(): Unit = { + WindowFoldITCase.testResults = mutable.MutableList() + CheckingIdentityRichAllWindowFunction.reset() + + val foldFunc = new FoldFunction[(String, Int), (String, Int)] { + override def fold(accumulator: (String, Int), value: (String, Int)): (String, Int) = { + (accumulator._1 + value._1, accumulator._2 + value._2) + } + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("a", 3)) + ctx.collect(("b", 4)) + ctx.collect(("a", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 5)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() { + } + }).assignTimestampsAndWatermarks(new WindowFoldITCase.Tuple2TimestampExtractor) + + source1 + .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply( + ("R:", 0), + foldFunc, + new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]()) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowFoldITCase.testResults += value.toString + } + }) + + env.execute("Fold All-Window Test") + + val expectedResult = mutable.MutableList( + "(R:aaa,3)", + "(R:bababa,24)") + + assertEquals(expectedResult.sorted, WindowFoldITCase.testResults.sorted) + + CheckingIdentityRichAllWindowFunction.checkRichMethodCalls() + } } diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala new file mode 100644 index 00000000000..c38f422afa0 --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowFunctionITCase.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction} +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +class WindowFunctionITCase { + + @Test + def testRichWindowFunction(): Unit = { + WindowFunctionITCase.testResults = mutable.MutableList() + CheckingIdentityRichWindowFunction.reset() + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("b", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 6)) + ctx.collect(("a", 7)) + ctx.collect(("a", 8)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() {} + + }).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor) + + source1 + .keyBy(0) + .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply(new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]()) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowFunctionITCase.testResults += value.toString + } + }) + + env.execute("RichWindowFunction Test") + + val expectedResult = mutable.MutableList( + "(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)", + "(b,3)", "(b,4)", "(b,5)") + + assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted) + + CheckingIdentityRichWindowFunction.checkRichMethodCalls() + } + + @Test + def testRichAllWindowFunction(): Unit = { + WindowFunctionITCase.testResults = mutable.MutableList() + CheckingIdentityRichAllWindowFunction.reset() + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("b", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 6)) + ctx.collect(("a", 7)) + ctx.collect(("a", 8)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() {} + + }).assignTimestampsAndWatermarks(new WindowFunctionITCase.Tuple2TimestampExtractor) + + source1 + .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply(new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]()) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowFunctionITCase.testResults += value.toString + } + }) + + env.execute("RichAllWindowFunction Test") + + val expectedResult = mutable.MutableList( + "(a,0)", "(a,1)", "(a,2)", "(a,6)", "(a,7)", "(a,8)", + "(b,3)", "(b,4)", "(b,5)") + + assertEquals(expectedResult.sorted, WindowFunctionITCase.testResults.sorted) + + CheckingIdentityRichAllWindowFunction.checkRichMethodCalls() + } +} + +object WindowFunctionITCase { + + private var testResults: mutable.MutableList[String] = null + + private class Tuple2TimestampExtractor extends AssignerWithPunctuatedWatermarks[(String, Int)] { + + private var currentTimestamp = -1L + + override def extractTimestamp(element: (String, Int), previousTimestamp: Long): Long = { + currentTimestamp = element._2 + currentTimestamp + } + + def checkAndGetNextWatermark( + lastElement: (String, Int), + extractedTimestamp: Long): Watermark = { + new Watermark(lastElement._2 - 1) + } + } +} diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala new file mode 100644 index 00000000000..ffd94fc6328 --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala + +import java.util.concurrent.TimeUnit + +import org.apache.flink.api.common.functions.{ReduceFunction, FoldFunction} +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.streaming.api.TimeCharacteristic +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction +import org.apache.flink.streaming.api.scala.testutils.{CheckingIdentityRichAllWindowFunction, CheckingIdentityRichWindowFunction} +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase + +import org.junit.Assert._ +import org.junit.Test + +import scala.collection.mutable + +/** + * Tests for Folds over windows. These also test whether OutputTypeConfigurable functions + * work for windows, because FoldWindowFunction is OutputTypeConfigurable. + */ +class WindowReduceITCase extends StreamingMultipleProgramsTestBase { + + @Test + def testReduceWindow(): Unit = { + WindowReduceITCase.testResults = mutable.MutableList() + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("b", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 6)) + ctx.collect(("a", 7)) + ctx.collect(("a", 8)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() { + } + }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor) + + source1 + .keyBy(0) + .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .reduce( (a, b) => (a._1 + b._1, a._2 + b._2) ) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowReduceITCase.testResults += value.toString + } + }) + + env.execute("Reduce Window Test") + + val expectedResult = mutable.MutableList( + "(aaa,3)", + "(aaa,21)", + "(bbb,12)") + + assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted) + } + + @Test + def testReduceWithWindowFunction(): Unit = { + WindowReduceITCase.testResults = mutable.MutableList() + CheckingIdentityRichWindowFunction.reset() + + val reduceFunc = new ReduceFunction[(String, Int)] { + override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = { + (a._1 + b._1, a._2 + b._2) + } + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("b", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 6)) + ctx.collect(("a", 7)) + ctx.collect(("a", 8)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() { + } + }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor) + + source1 + .keyBy(0) + .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply( + reduceFunc, + new CheckingIdentityRichWindowFunction[(String, Int), Tuple, TimeWindow]()) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowReduceITCase.testResults += value.toString + } + }) + + env.execute("Reduce Window Test") + + val expectedResult = mutable.MutableList( + "(aaa,3)", + "(aaa,21)", + "(bbb,12)") + + assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted) + + CheckingIdentityRichWindowFunction.checkRichMethodCalls() + } + + @Test + def testReduceAllWindow(): Unit = { + WindowReduceITCase.testResults = mutable.MutableList() + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("a", 3)) + ctx.collect(("b", 4)) + ctx.collect(("a", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 5)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() { + } + }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor) + + source1 + .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .reduce( (a, b) => (a._1 + b._1, a._2 + b._2) ) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowReduceITCase.testResults += value.toString + } + }) + + env.execute("Fold All-Window Test") + + val expectedResult = mutable.MutableList( + "(aaa,3)", + "(bababa,24)") + + assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted) + } + + @Test + def testReduceAllWithWindowFunction(): Unit = { + WindowReduceITCase.testResults = mutable.MutableList() + CheckingIdentityRichAllWindowFunction.reset() + + val reduceFunc = new ReduceFunction[(String, Int)] { + override def reduce(a: (String, Int), b: (String, Int)): (String, Int) = { + (a._1 + b._1, a._2 + b._2) + } + } + + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + env.setParallelism(1) + + val source1 = env.addSource(new SourceFunction[(String, Int)]() { + def run(ctx: SourceFunction.SourceContext[(String, Int)]) { + ctx.collect(("a", 0)) + ctx.collect(("a", 1)) + ctx.collect(("a", 2)) + ctx.collect(("b", 3)) + ctx.collect(("a", 3)) + ctx.collect(("b", 4)) + ctx.collect(("a", 4)) + ctx.collect(("b", 5)) + ctx.collect(("a", 5)) + + // source is finite, so it will have an implicit MAX watermark when it finishes + } + + def cancel() { + } + }).assignTimestampsAndWatermarks(new WindowReduceITCase.Tuple2TimestampExtractor) + + source1 + .windowAll(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS))) + .apply( + reduceFunc, + new CheckingIdentityRichAllWindowFunction[(String, Int), TimeWindow]()) + .addSink(new SinkFunction[(String, Int)]() { + def invoke(value: (String, Int)) { + WindowReduceITCase.testResults += value.toString + } + }) + + env.execute("Fold All-Window Test") + + val expectedResult = mutable.MutableList( + "(aaa,3)", + "(bababa,24)") + + assertEquals(expectedResult.sorted, WindowReduceITCase.testResults.sorted) + + CheckingIdentityRichAllWindowFunction.checkRichMethodCalls() + } +} + +object WindowReduceITCase { + + private var testResults: mutable.MutableList[String] = null + + private class Tuple2TimestampExtractor extends AssignerWithPunctuatedWatermarks[(String, Int)] { + + private var currentTimestamp = -1L + + override def extractTimestamp(element: (String, Int), previousTimestamp: Long): Long = { + currentTimestamp = element._2 + currentTimestamp + } + + def checkAndGetNextWatermark( + lastElement: (String, Int), + extractedTimestamp: Long): Watermark = { + new Watermark(lastElement._2 - 1) + } + } +} + + + diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichAllWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichAllWindowFunction.scala new file mode 100644 index 00000000000..acd8b12b885 --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichAllWindowFunction.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.testutils + +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.scala.function.RichAllWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + + +class CheckingIdentityRichAllWindowFunction[T, W <: Window] + extends RichAllWindowFunction[T, T, W] { + + override def apply(window: W, input: scala.Iterable[T], out: Collector[T]): Unit = { + for (value <- input) { + out.collect(value) + } + } + + override def open(conf: Configuration): Unit = { + super.open(conf) + CheckingIdentityRichAllWindowFunction.openCalled = true + } + + override def close(): Unit = { + super.close() + CheckingIdentityRichAllWindowFunction.closeCalled = true + } + + override def setRuntimeContext(context: RuntimeContext): Unit = { + super.setRuntimeContext(context) + CheckingIdentityRichAllWindowFunction.contextSet = true + } +} + + +object CheckingIdentityRichAllWindowFunction { + + @volatile + private[CheckingIdentityRichAllWindowFunction] var closeCalled = false + + @volatile + private[CheckingIdentityRichAllWindowFunction] var openCalled = false + + @volatile + private[CheckingIdentityRichAllWindowFunction] var contextSet = false + + def reset(): Unit = { + closeCalled = false + openCalled = false + contextSet = false + } + + def checkRichMethodCalls(): Unit = { + if (!contextSet) { + throw new AssertionError("context not set") + } + if (!openCalled) { + throw new AssertionError("open() not called") + } + if (!closeCalled) { + throw new AssertionError("close() not called") + } + } +} + + diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichWindowFunction.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichWindowFunction.scala new file mode 100644 index 00000000000..e0de0de3a03 --- /dev/null +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/testutils/CheckingIdentityRichWindowFunction.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.scala.testutils + +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.scala.function.RichWindowFunction +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector + + +class CheckingIdentityRichWindowFunction[T, K, W <: Window] + extends RichWindowFunction[T, T, K, W] { + + override def apply(key: K, window: W, input: scala.Iterable[T], out: Collector[T]): Unit = { + for (value <- input) { + out.collect(value) + } + } + + override def open(conf: Configuration): Unit = { + super.open(conf) + CheckingIdentityRichWindowFunction.openCalled = true + } + + override def close(): Unit = { + super.close() + CheckingIdentityRichWindowFunction.closeCalled = true + } + + override def setRuntimeContext(context: RuntimeContext): Unit = { + super.setRuntimeContext(context) + CheckingIdentityRichWindowFunction.contextSet = true + } +} + +object CheckingIdentityRichWindowFunction { + + @volatile + private[CheckingIdentityRichWindowFunction] var closeCalled = false + + @volatile + private[CheckingIdentityRichWindowFunction] var openCalled = false + + @volatile + private[CheckingIdentityRichWindowFunction] var contextSet = false + + def reset(): Unit = { + closeCalled = false + openCalled = false + contextSet = false + } + + def checkRichMethodCalls(): Unit = { + if (!contextSet) { + throw new AssertionError("context not set") + } + if (!openCalled) { + throw new AssertionError("open() not called") + } + if (!closeCalled) { + throw new AssertionError("close() not called") + } + } +} -- GitLab