diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala index 86b9044c6dee96d6e73ce3b0d5778e4b2485c878..091d89366dae2bd157040cf04ac0115618743baf 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/scala/table/ScalaStreamingTranslator.scala @@ -24,7 +24,7 @@ import org.apache.flink.api.java.table.JavaStreamingTranslator import org.apache.flink.api.table.Table import org.apache.flink.api.table.plan._ import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.streaming.api.scala.{DataStream, javaToScalaStream} +import org.apache.flink.streaming.api.scala.{DataStream, asScalaStream} /** * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and @@ -41,7 +41,7 @@ class ScalaStreamingTranslator extends PlanTranslator { override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): DataStream[O] = { // fake it till you make it ... - javaToScalaStream(javaTranslator.translate(op)) + asScalaStream(javaTranslator.translate(op)) } override def createTable[A]( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java index 9e2bc5d4f1d5858446b84a594c4bbfbc391ea214..e921940f531c8697d0b7a926b291d36b203e9bf8 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java @@ -321,8 +321,8 @@ public class CoGroupedStreams { private static class UnionTypeInfo extends TypeInformation> { private static final long serialVersionUID = 1L; - TypeInformation oneType; - TypeInformation twoType; + private final TypeInformation oneType; + private final TypeInformation twoType; public UnionTypeInfo(TypeInformation oneType, TypeInformation twoType) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java index b340e6ed89621c2598d9375a868fbca00927755b..9da9e34f8e17041fd602bb958ecc6287c9829fd3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java @@ -31,10 +31,21 @@ import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap; import org.apache.flink.streaming.api.operators.co.CoStreamMap; import org.apache.flink.streaming.api.transformations.TwoInputTransformation; +import static java.util.Objects.requireNonNull; + /** - * {@code ConnectedStreams} represents two connected streams of (possible) different data types. It - * can be used to apply transformations such as {@link CoMapFunction} on two - * {@link DataStream DataStreams} + * ConnectedStreams represent two connected streams of (possibly) different data types. + * Connected streams are useful for cases where operations on one stream directly + * affect the operations on the other stream, usually via shared state between the streams. + * + *

An example for the use of connected streams would be to apply rules that change over time + * onto another stream. One of the connected streams has the rules, the other stream the + * elements to apply the rules to. The operation on the connected stream maintains the + * current set of rules in the state. It may receive either a rule update and update the state + * or a data element and apply the rules in the state to the element. + * + *

The connected stream can be conceptually viewed as a union stream of an Either type, that + * holds either the first stream's type or the second stream's type. * * @param Type of the first input data steam. * @param Type of the second input data stream. @@ -42,20 +53,14 @@ import org.apache.flink.streaming.api.transformations.TwoInputTransformation; @Public public class ConnectedStreams { - protected StreamExecutionEnvironment environment; - protected DataStream inputStream1; - protected DataStream inputStream2; + protected final StreamExecutionEnvironment environment; + protected final DataStream inputStream1; + protected final DataStream inputStream2; - protected ConnectedStreams(StreamExecutionEnvironment env, - DataStream input1, - DataStream input2) { - this.environment = env; - if (input1 != null) { - this.inputStream1 = input1; - } - if (input2 != null) { - this.inputStream2 = input2; - } + protected ConnectedStreams(StreamExecutionEnvironment env, DataStream input1, DataStream input2) { + this.environment = requireNonNull(env); + this.inputStream1 = requireNonNull(input1); + this.inputStream2 = requireNonNull(input2); } public StreamExecutionEnvironment getExecutionEnvironment() { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index a44b65051f93d8f2006220fbb4f33fd1b7880d6c..07a91e9b09d24522e469380ade8e75e1ff717ad5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -355,7 +355,7 @@ public class DataStream { * Note: This method works only on single field keys. * * @param partitioner The partitioner to assign partitions to keys. - * @param field The field index on which the DataStream is to partitioned. + * @param field The expression for the field on which the DataStream is to partitioned. * @return The partitioned DataStream. */ public DataStream partitionCustom(Partitioner partitioner, String field) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java index 03ce2f59bff883f64b6791a10279765bab411ffb..3f996f5cc6e8346d8d75d244118445da1d68c26a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoFlatMapFunction.java @@ -25,20 +25,44 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.util.Collector; /** - * A CoFlatMapFunction represents a FlatMap transformation with two different - * input types. + * A CoFlatMapFunction implements a flat-map transformation over two + * connected streams. + * + *

The same instance of the transformation function is used to transform + * both of the connected streams. That way, the stream transformations can + * share state. + * + *

An example for the use of connected streams would be to apply rules that change over time + * onto elements of a stream. One of the connected streams has the rules, the other stream the + * elements to apply the rules to. The operation on the connected stream maintains the + * current set of rules in the state. It may receive either a rule update (from the first stream) + * and update the state, or a data element (from the second stream) and apply the rules in the + * state to the element. The result of applying the rules would be emitted. * - * @param - * Type of the first input. - * @param - * Type of the second input. - * @param - * Output type. + * @param Type of the first input. + * @param Type of the second input. + * @param Output type. */ @Public public interface CoFlatMapFunction extends Function, Serializable { + /** + * This method is called for each element in the first of the connected streams. + * + * @param value The stream element + * @param out The collector to emit resulting elements to + * @throws Exception The function may throw exceptions which cause the streaming program + * to fail and go into recovery. + */ void flatMap1(IN1 value, Collector out) throws Exception; + /** + * This method is called for each element in the second of the connected streams. + * + * @param value The stream element + * @param out The collector to emit resulting elements to + * @throws Exception The function may throw exceptions which cause the streaming program + * to fail and go into recovery. + */ void flatMap2(IN2 value, Collector out) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java index 6a3b4e055c0d2a7a37778bac65b5ebe07a032798..71fcbc8400d285a9f1adbb978a97394fae9e4af0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoMapFunction.java @@ -24,20 +24,37 @@ import org.apache.flink.annotation.Public; import org.apache.flink.api.common.functions.Function; /** - * A CoMapFunction represents a Map transformation with two different input - * types. + * A CoFlatMapFunction implements a map() transformation over two + * connected streams. * - * @param - * Type of the first input. - * @param - * Type of the second input. - * @param - * Output type. + *

The same instance of the transformation function is used to transform + * both of the connected streams. That way, the stream transformations can + * share state. + * + * @param Type of the first input. + * @param Type of the second input. + * @param Output type. */ @Public public interface CoMapFunction extends Function, Serializable { + /** + * This method is called for each element in the first of the connected streams. + * + * @param value The stream element + * @return The resulting element + * @throws Exception The function may throw exceptions which cause the streaming program + * to fail and go into recovery. + */ OUT map1(IN1 value) throws Exception; + /** + * This method is called for each element in the second of the connected streams. + * + * @param value The stream element + * @return The resulting element + * @throws Exception The function may throw exceptions which cause the streaming program + * to fail and go into recovery. + */ OUT map2(IN2 value) throws Exception; } diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index e8d3e0536334704be708bd5ae1e467f6c9f3d0df..a4df98052f5982ede39032d6a13e2f7413bba4a3 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -99,7 +99,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @return The data stream that is the result of applying the reduce function to the window. */ def reduce(function: ReduceFunction[T]): DataStream[T] = { - javaStream.reduce(clean(function)) + asScalaStream(javaStream.reduce(clean(function))) } /** @@ -139,13 +139,13 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { def fold[R: TypeInformation: ClassTag]( initialValue: R, function: FoldFunction[T,R]): DataStream[R] = { + if (function == null) { throw new NullPointerException("Fold function must not be null.") } val resultType : TypeInformation[R] = implicitly[TypeInformation[R]] - - javaStream.fold(initialValue, function, resultType) + asScalaStream(javaStream.fold(initialValue, function, resultType)) } /** @@ -182,13 +182,14 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { */ def apply[R: TypeInformation: ClassTag]( function: AllWindowFunction[Iterable[T], R, W]): DataStream[R] = { + val cleanedFunction = clean(function) val javaFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] { def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { cleanedFunction(window, elements.asScala, out) } } - javaStream.apply(javaFunction, implicitly[TypeInformation[R]]) + asScalaStream(javaStream.apply(javaFunction, implicitly[TypeInformation[R]])) } /** @@ -204,13 +205,14 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { */ def apply[R: TypeInformation: ClassTag]( function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = { + val cleanedFunction = clean(function) val applyFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] { def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = { cleanedFunction(window, elements.asScala, out) } } - javaStream.apply(applyFunction, implicitly[TypeInformation[R]]) + asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]])) } /** @@ -227,7 +229,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { def apply[R: TypeInformation: ClassTag]( preAggregator: ReduceFunction[T], function: AllWindowFunction[T, R, W]): DataStream[R] = { - javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]]) + + val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.apply(clean(preAggregator), clean(function), returnType)) } /** @@ -262,7 +266,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { cleanApply(window, input, out) } } - javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]]) + + val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.apply(reducer, applyFunction, returnType)) } /** @@ -281,11 +287,12 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { initialValue: R, preAggregator: FoldFunction[T, R], function: AllWindowFunction[R, R, W]): DataStream[R] = { - javaStream.apply( + + asScalaStream(javaStream.apply( initialValue, clean(preAggregator), clean(function), - implicitly[TypeInformation[R]]) + implicitly[TypeInformation[R]])) } /** @@ -322,7 +329,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { cleanApply(window, input, out) } } - javaStream.apply(initialValue, folder, applyFunction, implicitly[TypeInformation[R]]) + val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.apply(initialValue, folder, applyFunction, returnType)) } // ------------------------------------------------------------------------ diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala index ce96e4fc652b4db7667d6b96a595968f7ab19787..7572885f5c99d9ffd4d91defd10a6552b692bec2 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala @@ -278,13 +278,13 @@ object CoGroupedStreams { val coGroup = new JavaCoGroupedStreams[T1, T2](input1.javaStream, input2.javaStream) - coGroup + asScalaStream(coGroup .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) - .apply(clean(function), implicitly[TypeInformation[T]]) + .apply(clean(function), implicitly[TypeInformation[T]])) } /** diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala index c3ea144e095792587bcb34df9cd30daaea76e26d..278090d109b79b0956eb318ec36b9ce36bd6c525 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala @@ -29,27 +29,40 @@ import org.apache.flink.util.Collector import scala.reflect.ClassTag /** - * [[ConnectedStreams]] represents two connected streams of (possible) different data types. It - * can be used to apply transformations such as [[CoMapFunction]] on two - * [[DataStream]]s. + * [[ConnectedStreams]] represents two connected streams of (possibly) different data types. + * Connected streams are useful for cases where operations on one stream directly + * affect the operations on the other stream, usually via shared state between the streams. + * + * An example for the use of connected streams would be to apply rules that change over time + * onto another stream. One of the connected streams has the rules, the other stream the + * elements to apply the rules to. The operation on the connected stream maintains the + * current set of rules in the state. It may receive either a rule update and update the state + * or a data element and apply the rules in the state to the element. + * + * The connected stream can be conceptually viewed as a union stream of an Either type, that + * holds either the first stream's type or the second stream's type. */ @Public class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { + // ------------------------------------------------------ + // Transformations + // ------------------------------------------------------ + /** - * Applies a CoMap transformation on a {@link ConnectedStreams} and maps - * the output to a common type. The transformation calls a - * - * @param fun1 for each element of the first input and - * @param fun2 for each element of the second input. Each - * CoMapFunction call returns exactly one element. - * - * The CoMapFunction used to jointly transform the two input - * DataStreams - * @return The transformed { @link DataStream} + * Applies a CoMap transformation on the connected streams. + * + * The transformation consists of two separate functions, where + * the first one is called for each element of the first connected stream, + * and the second one is called for each element of the second connected stream. + * + * @param fun1 Function called per element of the first input. + * @param fun2 Function called per element of the second input. + * @return The resulting data stream. */ def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R): - DataStream[R] = { + DataStream[R] = { + if (fun1 == null || fun2 == null) { throw new NullPointerException("Map function must not be null.") } @@ -64,66 +77,72 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { } /** - * Applies a CoMap transformation on a {@link ConnectedStreams} and maps - * the output to a common type. The transformation calls a - * {@link CoMapFunction#map1} for each element of the first input and - * {@link CoMapFunction#map2} for each element of the second input. Each - * CoMapFunction call returns exactly one element. The user can also extend - * {@link RichCoMapFunction} to gain access to other features provided by - * the {@link RichFuntion} interface. + * Applies a CoMap transformation on these connected streams. + * + * The transformation calls [[CoMapFunction#map1]] for each element + * in the first stream and [[CoMapFunction#map2]] for each element + * of the second stream. + * + * On can pass a subclass of [[org.apache.flink.streaming.api.functions.co.RichCoMapFunction]] + * to gain access to the [[org.apache.flink.api.common.functions.RuntimeContext]] + * and to additional life cycle methods. * * @param coMapper - * The CoMapFunction used to jointly transform the two input - * DataStreams - * @return The transformed { @link DataStream} + * The CoMapFunction used to transform the two connected streams + * @return + * The resulting data stream */ - def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): - DataStream[R] = { + def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R] = { if (coMapper == null) { throw new NullPointerException("Map function must not be null.") } val outType : TypeInformation[R] = implicitly[TypeInformation[R]] - javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]] + asScalaStream(javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]]) } /** - * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and - * maps the output to a common type. The transformation calls a - * {@link CoFlatMapFunction#flatMap1} for each element of the first input - * and {@link CoFlatMapFunction#flatMap2} for each element of the second - * input. Each CoFlatMapFunction call returns any number of elements - * including none. The user can also extend {@link RichFlatMapFunction} to - * gain access to other features provided by the {@link RichFuntion} - * interface. + * Applies a CoFlatMap transformation on these connected streams. + * + * The transformation calls [[CoFlatMapFunction#flatMap1]] for each element + * in the first stream and [[CoFlatMapFunction#flatMap2]] for each element + * of the second stream. + * + * On can pass a subclass of [[org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction]] + * to gain access to the [[org.apache.flink.api.common.functions.RuntimeContext]] + * and to additional life cycle methods. * * @param coFlatMapper - * The CoFlatMapFunction used to jointly transform the two input - * DataStreams - * @return The transformed { @link DataStream} + * The CoFlatMapFunction used to transform the two connected streams + * @return + * The resulting data stream. */ def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]): - DataStream[R] = { + DataStream[R] = { + if (coFlatMapper == null) { throw new NullPointerException("FlatMap function must not be null.") } val outType : TypeInformation[R] = implicitly[TypeInformation[R]] - javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]] + asScalaStream(javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]]) } /** - * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and - * maps the output to a common type. The transformation calls a - * - * @param fun1 for each element of the first input - * and @param fun2 for each element of the second - * input. Each CoFlatMapFunction call returns any number of elements - * including none. - * @return The transformed { @link DataStream} + * Applies a CoFlatMap transformation on the connected streams. + * + * The transformation consists of two separate functions, where + * the first one is called for each element of the first connected stream, + * and the second one is called for each element of the second connected stream. + * + * @param fun1 Function called per element of the first input. + * @param fun2 Function called per element of the second input. + * @return The resulting data stream. */ - def flatMap[R: TypeInformation: ClassTag](fun1: (IN1, Collector[R]) => Unit, + def flatMap[R: TypeInformation: ClassTag]( + fun1: (IN1, Collector[R]) => Unit, fun2: (IN2, Collector[R]) => Unit): DataStream[R] = { + if (fun1 == null || fun2 == null) { throw new NullPointerException("FlatMap functions must not be null.") } @@ -137,114 +156,101 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { } /** - * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and - * maps the output to a common type. The transformation calls a - * - * @param fun1 for each element of the first input - * and @param fun2 for each element of the second - * input. Each CoFlatMapFunction call returns any number of elements - * including none. - * @return The transformed { @link DataStream} + * Applies a CoFlatMap transformation on the connected streams. + * + * The transformation consists of two separate functions, where + * the first one is called for each element of the first connected stream, + * and the second one is called for each element of the second connected stream. + * + * @param fun1 Function called per element of the first input. + * @param fun2 Function called per element of the second input. + * @return The resulting data stream. */ - def flatMap[R: TypeInformation: ClassTag](fun1: IN1 => TraversableOnce[R], + def flatMap[R: TypeInformation: ClassTag]( + fun1: IN1 => TraversableOnce[R], fun2: IN2 => TraversableOnce[R]): DataStream[R] = { + if (fun1 == null || fun2 == null) { throw new NullPointerException("FlatMap functions must not be null.") } val cleanFun1 = clean(fun1) val cleanFun2 = clean(fun2) + val flatMapper = new CoFlatMapFunction[IN1, IN2, R] { def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value) foreach out.collect } def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value) foreach out.collect } } + flatMap(flatMapper) } + // ------------------------------------------------------ + // grouping and partitioning + // ------------------------------------------------------ + /** - * GroupBy operation for connected data stream. Groups the elements of - * input1 and input2 according to keyPosition1 and keyPosition2. Used for - * applying function on grouped data streams for example - * {@link ConnectedStreams#reduce} + * Keys the two connected streams together. After this operation, all + * elements with the same key from both streams will be sent to the + * same parallel instance of the transformation functions. * - * @param keyPosition1 - * The field used to compute the hashcode of the elements in the - * first input stream. - * @param keyPosition2 - * The field used to compute the hashcode of the elements in the - * second input stream. - * @return @return The transformed { @link ConnectedStreams} + * @param keyPosition1 The first stream's key field + * @param keyPosition2 The second stream's key field + * @return The key-grouped connected streams */ def keyBy(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = { - javaStream.keyBy(keyPosition1, keyPosition2) + asScalaStream(javaStream.keyBy(keyPosition1, keyPosition2)) } /** - * GroupBy operation for connected data stream. Groups the elements of - * input1 and input2 according to keyPositions1 and keyPositions2. Used for - * applying function on grouped data streams for example - * {@link ConnectedStreams#reduce} + * Keys the two connected streams together. After this operation, all + * elements with the same key from both streams will be sent to the + * same parallel instance of the transformation functions. * - * @param keyPositions1 - * The fields used to group the first input stream. - * @param keyPositions2 - * The fields used to group the second input stream. - * @return @return The transformed { @link ConnectedStreams} + * @param keyPositions1 The first stream's key fields + * @param keyPositions2 The second stream's key fields + * @return The key-grouped connected streams */ - def keyBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): - ConnectedStreams[IN1, IN2] = { - javaStream.keyBy(keyPositions1, keyPositions2) + def keyBy(keyPositions1: Array[Int], keyPositions2: Array[Int]): ConnectedStreams[IN1, IN2] = { + asScalaStream(javaStream.keyBy(keyPositions1, keyPositions2)) } /** - * GroupBy operation for connected data stream using key expressions. Groups - * the elements of input1 and input2 according to field1 and field2. A field - * expression is either the name of a public field or a getter method with - * parentheses of the {@link DataStream}S underlying type. A dot can be used - * to drill down into objects, as in {@code "field1.getInnerField2()" }. + * Keys the two connected streams together. After this operation, all + * elements with the same key from both streams will be sent to the + * same parallel instance of the transformation functions. * - * @param field1 - * The grouping expression for the first input - * @param field2 - * The grouping expression for the second input - * @return The grouped { @link ConnectedStreams} + * @param field1 The first stream's key expression + * @param field2 The second stream's key expression + * @return The key-grouped connected streams */ def keyBy(field1: String, field2: String): ConnectedStreams[IN1, IN2] = { - javaStream.keyBy(field1, field2) + asScalaStream(javaStream.keyBy(field1, field2)) } /** - * GroupBy operation for connected data stream using key expressions. Groups - * the elements of input1 and input2 according to fields1 and fields2. A - * field expression is either the name of a public field or a getter method - * with parentheses of the {@link DataStream}S underlying type. A dot can be - * used to drill down into objects, as in {@code "field1.getInnerField2()" } - * . + * Keys the two connected streams together. After this operation, all + * elements with the same key from both streams will be sent to the + * same parallel instance of the transformation functions. * - * @param fields1 - * The grouping expressions for the first input - * @param fields2 - * The grouping expressions for the second input - * @return The grouped { @link ConnectedStreams} + * @param fields1 The first stream's key expressions + * @param fields2 The second stream's key expressions + * @return The key-grouped connected streams */ - def keyBy(fields1: Array[String], fields2: Array[String]): - ConnectedStreams[IN1, IN2] = { - javaStream.keyBy(fields1, fields2) + def keyBy(fields1: Array[String], fields2: Array[String]): ConnectedStreams[IN1, IN2] = { + asScalaStream(javaStream.keyBy(fields1, fields2)) } /** - * GroupBy operation for connected data stream. Groups the elements of - * input1 and input2 using fun1 and fun2. Used for applying - * function on grouped data streams for example - * {@link ConnectedStreams#reduce} + * Keys the two connected streams together. After this operation, all + * elements with the same key from both streams will be sent to the + * same parallel instance of the transformation functions. * - * @param fun1 - * The function used for grouping the first input - * @param fun2 - * The function used for grouping the second input - * @return The grouped { @link ConnectedStreams} + * @param fun1 The first stream's key function + * @param fun2 The second stream's key function + * @return The key-grouped connected streams */ def keyBy[K1: TypeInformation, K2: TypeInformation](fun1: IN1 => K1, fun2: IN2 => K2): - ConnectedStreams[IN1, IN2] = { + ConnectedStreams[IN1, IN2] = { val keyType1 = implicitly[TypeInformation[K1]] val keyType2 = implicitly[TypeInformation[K2]] @@ -255,84 +261,74 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { val keyExtractor1 = new KeySelectorWithType[IN1, K1](cleanFun1, keyType1) val keyExtractor2 = new KeySelectorWithType[IN2, K2](cleanFun2, keyType2) - javaStream.keyBy(keyExtractor1, keyExtractor2) + asScalaStream(javaStream.keyBy(keyExtractor1, keyExtractor2)) } /** - * PartitionBy operation for connected data stream. Partitions the elements of - * input1 and input2 according to keyPosition1 and keyPosition2. + * Partitions the two connected streams together. After this operation, all + * elements with the same partition key from both streams will be sent to the + * same parallel instance of the transformation functions. * - * @param keyPosition1 - * The field used to compute the hashcode of the elements in the - * first input stream. - * @param keyPosition2 - * The field used to compute the hashcode of the elements in the - * second input stream. - * @return The transformed { @link ConnectedStreams} + * @param keyPosition1 The first stream's partition key field + * @param keyPosition2 The second stream's partition key field + * @return The co-partitioned connected streams */ def partitionByHash(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = { - javaStream.partitionByHash(keyPosition1, keyPosition2) + asScalaStream(javaStream.partitionByHash(keyPosition1, keyPosition2)) } /** - * PartitionBy operation for connected data stream. Partitions the elements of - * input1 and input2 according to keyPositions1 and keyPositions2. + * Partitions the two connected streams together. After this operation, all + * elements with the same partition key from both streams will be sent to the + * same parallel instance of the transformation functions. * - * @param keyPositions1 - * The fields used to partition the first input stream. - * @param keyPositions2 - * The fields used to partition the second input stream. - * @return The transformed { @link ConnectedStreams} + * @param keyPositions1 The first stream's partition key fields + * @param keyPositions2 The second stream's partition key fields + * @return The co-partitioned connected streams */ def partitionByHash(keyPositions1: Array[Int], keyPositions2: Array[Int]): - ConnectedStreams[IN1, IN2] = { - javaStream.partitionByHash(keyPositions1, keyPositions2) + ConnectedStreams[IN1, IN2] = { + asScalaStream(javaStream.partitionByHash(keyPositions1, keyPositions2)) } /** - * PartitionBy operation for connected data stream using key expressions. Partitions - * the elements of input1 and input2 according to field1 and field2. A field - * expression is either the name of a public field or a getter method with - * parentheses of the {@link DataStream}S underlying type. A dot can be used - * to drill down into objects, as in {@code "field1.getInnerField2()" }. + * Partitions the two connected streams together. After this operation, all + * elements with the same partition key from both streams will be sent to the + * same parallel instance of the transformation functions. * - * @param field1 - * The partitioning expression for the first input - * @param field2 - * The partitioning expression for the second input - * @return The grouped { @link ConnectedStreams} + * @param field1 The first stream's partition key expression + * @param field2 The second stream's partition key expression + * @return The co-partitioned connected streams */ def partitionByHash(field1: String, field2: String): ConnectedStreams[IN1, IN2] = { - javaStream.partitionByHash(field1, field2) + asScalaStream(javaStream.partitionByHash(field1, field2)) } /** - * PartitionBy operation for connected data stream using key expressions. Partitions - * the elements of input1 and input2 according to fields1 and fields2. + * Partitions the two connected streams together. After this operation, all + * elements with the same partition key from both streams will be sent to the + * same parallel instance of the transformation functions. * - * @param fields1 - * The partitioning expressions for the first input - * @param fields2 - * The partitioning expressions for the second input - * @return The partitioned { @link ConnectedStreams} + * @param fields1 The first stream's partition key field expressions + * @param fields2 The second stream's partition key field expressions + * @return The co-partitioned connected streams */ - def partitionByHash(fields1: Array[String], fields2: Array[String]): - ConnectedStreams[IN1, IN2] = { - javaStream.partitionByHash(fields1, fields2) + def partitionByHash(fields1: Array[String], fields2: Array[String]): + ConnectedStreams[IN1, IN2] = { + asScalaStream(javaStream.partitionByHash(fields1, fields2)) } /** - * PartitionBy operation for connected data stream. Partitions the elements of - * input1 and input2 using fun1 and fun2. + * Partitions the two connected streams together. After this operation, all + * elements with the same partition key from both streams will be sent to the + * same parallel instance of the transformation functions. * - * @param fun1 - * The function used for partitioning the first input - * @param fun2 - * The function used for partitioning the second input - * @return The partitioned { @link ConnectedStreams} + * @param fun1 The first stream's partition key function + * @param fun2 The second stream's partition key function + * @return The co-partitioned connected streams */ def partitionByHash[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L): - ConnectedStreams[IN1, IN2] = { + ConnectedStreams[IN1, IN2] = { val cleanFun1 = clean(fun1) val cleanFun2 = clean(fun2) @@ -344,17 +340,16 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { def getKey(in: IN2) = cleanFun2(in) } - javaStream.partitionByHash(keyExtractor1, keyExtractor2) + asScalaStream(javaStream.partitionByHash(keyExtractor1, keyExtractor2)) } /** * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning - * is not disabled in the {@link org.apache.flink.api.common.ExecutionConfig} + * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]] */ private[flink] def clean[F <: AnyRef](f: F): F = { new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f) } - } @Internal diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 0f6ec7d0eb0cc563fe2ef2cefc0e84d853d12f27..808f4ea96075bef9b6db5f9cf9e201bdd0aed7e1 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -49,10 +49,10 @@ class DataStream[T](stream: JavaStream[T]) { def javaStream: JavaStream[T] = stream /** - * Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]]. - * - * @return associated execution environment - */ + * Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]]. + * + * @return associated execution environment + */ def getExecutionEnvironment: StreamExecutionEnvironment = new StreamExecutionEnvironment(stream.getExecutionEnvironment) @@ -112,7 +112,7 @@ class DataStream[T](stream: JavaStream[T]) { * @return The named operator */ def name(name: String) : DataStream[T] = stream match { - case stream : SingleOutputStreamOperator[T,_] => stream.name(name) + case stream : SingleOutputStreamOperator[T,_] => asScalaStream(stream.name(name)) case _ => throw new UnsupportedOperationException("Only supported for operators.") this } @@ -131,7 +131,7 @@ class DataStream[T](stream: JavaStream[T]) { */ @PublicEvolving def uid(uid: String) : DataStream[T] = javaStream match { - case stream : SingleOutputStreamOperator[T,_] => stream.uid(uid) + case stream : SingleOutputStreamOperator[T,_] => asScalaStream(stream.uid(uid)) case _ => throw new UnsupportedOperationException("Only supported for operators.") this } @@ -231,7 +231,7 @@ class DataStream[T](stream: JavaStream[T]) { * */ def union(dataStreams: DataStream[T]*): DataStream[T] = - stream.union(dataStreams.map(_.javaStream): _*) + asScalaStream(stream.union(dataStreams.map(_.javaStream): _*)) /** * Creates a new ConnectedStreams by connecting @@ -239,20 +239,20 @@ class DataStream[T](stream: JavaStream[T]) { * DataStreams connected using this operators can be used with CoFunctions. */ def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] = - stream.connect(dataStream.javaStream) + asScalaStream(stream.connect(dataStream.javaStream)) /** * Groups the elements of a DataStream by the given key positions (for tuple/array types) to * be used with grouped operators like grouped reduce or grouped aggregations. */ - def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = stream.keyBy(fields: _*) + def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = asScalaStream(stream.keyBy(fields: _*)) /** * Groups the elements of a DataStream by the given field expressions to * be used with grouped operators like grouped reduce or grouped aggregations. */ def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple] = - stream.keyBy(firstField +: otherFields.toArray: _*) + asScalaStream(stream.keyBy(firstField +: otherFields.toArray: _*)) /** * Groups the elements of a DataStream by the given K key to @@ -267,21 +267,22 @@ class DataStream[T](stream: JavaStream[T]) { def getKey(in: T) = cleanFun(in) override def getProducedType: TypeInformation[K] = keyType } - new JavaKeyedStream(stream, keyExtractor, keyType) + asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType)) } /** * Partitions the elements of a DataStream by the given key positions (for tuple/array types) to * be used with grouped operators like grouped reduce or grouped aggregations. */ - def partitionByHash(fields: Int*): DataStream[T] = stream.partitionByHash(fields: _*) + def partitionByHash(fields: Int*): DataStream[T] = + asScalaStream(stream.partitionByHash(fields: _*)) /** * Groups the elements of a DataStream by the given field expressions to * be used with grouped operators like grouped reduce or grouped aggregations. */ def partitionByHash(firstField: String, otherFields: String*): DataStream[T] = - stream.partitionByHash(firstField +: otherFields.toArray: _*) + asScalaStream(stream.partitionByHash(firstField +: otherFields.toArray: _*)) /** * Groups the elements of a DataStream by the given K key to @@ -294,7 +295,8 @@ class DataStream[T](stream: JavaStream[T]) { def getKey(in: T) = cleanFun(in) override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]] } - stream.partitionByHash(keyExtractor) + + asScalaStream(stream.partitionByHash(keyExtractor)) } /** @@ -305,7 +307,7 @@ class DataStream[T](stream: JavaStream[T]) { * Note: This method works only on single field keys. */ def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] = - stream.partitionCustom(partitioner, field) + asScalaStream(stream.partitionCustom(partitioner, field)) /** * Partitions a POJO DataStream on the specified key fields using a custom partitioner. @@ -315,7 +317,8 @@ class DataStream[T](stream: JavaStream[T]) { * Note: This method works only on single field keys. */ def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String) - : DataStream[T] = stream.partitionCustom(partitioner, field) + : DataStream[T] = + asScalaStream(stream.partitionCustom(partitioner, field)) /** * Partitions a DataStream on the key returned by the selector, using a custom partitioner. @@ -326,20 +329,24 @@ class DataStream[T](stream: JavaStream[T]) { * of fields. */ def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K) - : DataStream[T] = { + : DataStream[T] = { + + val keyType = implicitly[TypeInformation[K]] val cleanFun = clean(fun) + val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] { def getKey(in: T) = cleanFun(in) - override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]] + override def getProducedType(): TypeInformation[K] = keyType } - stream.partitionCustom(partitioner, keyExtractor) + + asScalaStream(stream.partitionCustom(partitioner, keyExtractor)) } /** * Sets the partitioning of the DataStream so that the output tuples * are broad casted to every parallel instance of the next component. */ - def broadcast: DataStream[T] = stream.broadcast() + def broadcast: DataStream[T] = asScalaStream(stream.broadcast()) /** * Sets the partitioning of the DataStream so that the output values all go to @@ -347,27 +354,27 @@ class DataStream[T](stream: JavaStream[T]) { * since it might cause a serious performance bottleneck in the application. */ @PublicEvolving - def global: DataStream[T] = stream.global() + def global: DataStream[T] = asScalaStream(stream.global()) /** * Sets the partitioning of the DataStream so that the output tuples * are shuffled to the next component. */ @PublicEvolving - def shuffle: DataStream[T] = stream.shuffle() + def shuffle: DataStream[T] = asScalaStream(stream.shuffle()) /** * Sets the partitioning of the DataStream so that the output tuples * are forwarded to the local subtask of the next component (whenever * possible). */ - def forward: DataStream[T] = stream.forward() + def forward: DataStream[T] = asScalaStream(stream.forward()) /** * Sets the partitioning of the DataStream so that the output tuples * are distributed evenly to the next component. */ - def rebalance: DataStream[T] = stream.rebalance() + def rebalance: DataStream[T] = asScalaStream(stream.rebalance()) /** * Sets the partitioning of the [[DataStream]] so that the output tuples @@ -387,7 +394,7 @@ class DataStream[T](stream: JavaStream[T]) { * downstream operations will have a differing number of inputs from upstream operations. */ @PublicEvolving - def rescale: DataStream[T] = stream.rescale() + def rescale: DataStream[T] = asScalaStream(stream.rescale()) /** * Initiates an iterative part of the program that creates a loop by feeding @@ -440,13 +447,16 @@ class DataStream[T](stream: JavaStream[T]) { * */ @PublicEvolving - def iterate[R, F: TypeInformation: ClassTag](stepFunction: ConnectedStreams[T, F] => - (DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = { + def iterate[R, F: TypeInformation: ClassTag]( + stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]), + maxWaitTimeMillis:Long): DataStream[R] = { + val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]] + val connectedIterativeStream = stream.iterate(maxWaitTimeMillis). withFeedbackType(feedbackType) - val (feedback, output) = stepFunction(connectedIterativeStream) + val (feedback, output) = stepFunction(asScalaStream(connectedIterativeStream)) connectedIterativeStream.closeWith(feedback.javaStream) output } @@ -475,7 +485,7 @@ class DataStream[T](stream: JavaStream[T]) { } val outType : TypeInformation[R] = implicitly[TypeInformation[R]] - stream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]] + asScalaStream(stream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]]) } /** @@ -488,7 +498,7 @@ class DataStream[T](stream: JavaStream[T]) { } val outType : TypeInformation[R] = implicitly[TypeInformation[R]] - stream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]] + asScalaStream(stream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]]) } /** @@ -528,7 +538,7 @@ class DataStream[T](stream: JavaStream[T]) { if (filter == null) { throw new NullPointerException("Filter function must not be null.") } - stream.filter(filter) + asScalaStream(stream.filter(filter)) } /** @@ -539,10 +549,10 @@ class DataStream[T](stream: JavaStream[T]) { throw new NullPointerException("Filter function must not be null.") } val cleanFun = clean(fun) - val filter = new FilterFunction[T] { + val filterFun = new FilterFunction[T] { def filter(in: T) = cleanFun(in) } - this.filter(filter) + filter(filterFun) } /** @@ -643,7 +653,7 @@ class DataStream[T](stream: JavaStream[T]) { */ @deprecated def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = { - stream.assignTimestamps(clean(extractor)) + asScalaStream(stream.assignTimestamps(clean(extractor))) } /** @@ -671,8 +681,8 @@ class DataStream[T](stream: JavaStream[T]) { @PublicEvolving def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T]) : DataStream[T] = { - - stream.assignTimestampsAndWatermarks(assigner) + + asScalaStream(stream.assignTimestampsAndWatermarks(assigner)) } /** @@ -701,8 +711,8 @@ class DataStream[T](stream: JavaStream[T]) { @PublicEvolving def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T]) : DataStream[T] = { - - stream.assignTimestampsAndWatermarks(assigner) + + asScalaStream(stream.assignTimestampsAndWatermarks(assigner)) } /** @@ -726,7 +736,7 @@ class DataStream[T](stream: JavaStream[T]) { cleanExtractor(element) } } - stream.assignTimestampsAndWatermarks(extractorFunction) + asScalaStream(stream.assignTimestampsAndWatermarks(extractorFunction)) } /** @@ -735,7 +745,7 @@ class DataStream[T](stream: JavaStream[T]) { * OutputSelector. Calling this method on an operator creates a new * [[SplitStream]]. */ - def split(selector: OutputSelector[T]): SplitStream[T] = stream.split(selector) + def split(selector: OutputSelector[T]): SplitStream[T] = asScalaStream(stream.split(selector)) /** * Creates a new [[SplitStream]] that contains only the elements satisfying the diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala index 21c5d84d55fe9554b31fd8859fe03737e3d695c1..b111d9acf197928176dea875d37d7b5d99e983a7 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala @@ -269,13 +269,13 @@ object JoinedStreams { val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream) - join + asScalaStream(join .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) - .apply(clean(function), implicitly[TypeInformation[T]]) + .apply(clean(function), implicitly[TypeInformation[T]])) } /** @@ -286,13 +286,13 @@ object JoinedStreams { val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream) - join + asScalaStream(join .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) - .apply(clean(function), implicitly[TypeInformation[T]]) + .apply(clean(function), implicitly[TypeInformation[T]])) } /** diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala index 8419f89ec1712037df1cb1c4e791773cfacfcb5f..b492bf9028b13b2cf2067e178392221414bf8f36 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala @@ -128,7 +128,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] throw new NullPointerException("Reduce function must not be null.") } - javaStream.reduce(reducer) + asScalaStream(javaStream.reduce(reducer)) } /** @@ -141,7 +141,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] } val cleanFun = clean(fun) val reducer = new ReduceFunction[T] { - def reduce(v1: T, v2: T) = { cleanFun(v1, v2) } + def reduce(v1: T, v2: T) : T = { cleanFun(v1, v2) } } reduce(reducer) } @@ -152,15 +152,15 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] * aggregate is kept per key. */ def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]): - DataStream[R] = { + DataStream[R] = { if (folder == null) { throw new NullPointerException("Fold function must not be null.") } val outType : TypeInformation[R] = implicitly[TypeInformation[R]] - javaStream.fold(initialValue, folder). - returns(outType).asInstanceOf[JavaStream[R]] + asScalaStream(javaStream.fold(initialValue, folder). + returns(outType).asInstanceOf[JavaStream[R]]) } /** diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala index ac83514acbae9d64051addf1c3c7764e258c356e..0b9ac69c75b73c8d34e3279cf7ed8b6e6e170a41 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/SplitStream.scala @@ -34,6 +34,6 @@ class SplitStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaS /** * Sets the output names for which the next operator will receive values. */ - def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*) - + def select(outputNames: String*): DataStream[T] = + asScalaStream(javaStream.select(outputNames: _*)) } diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index a7152e22c9cc252839f0ce6d2677e201062af612..4107e4dd52564018fda61f2a3dde9e809b24cdbe 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -27,7 +27,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.api.scala.ClosureCleaner import org.apache.flink.runtime.state.AbstractStateBackend -import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode} import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv} import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType import org.apache.flink.streaming.api.functions.source.SourceFunction @@ -404,7 +403,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { require(data != null, "Data must not be null.") val typeInfo = implicitly[TypeInformation[T]] - javaEnv.fromCollection(scala.collection.JavaConversions.asJavaCollection(data), typeInfo) + val collection = scala.collection.JavaConversions.asJavaCollection(data) + asScalaStream(javaEnv.fromCollection(collection, typeInfo)) } /** @@ -415,33 +415,32 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { */ def fromCollection[T: ClassTag : TypeInformation] (data: Iterator[T]): DataStream[T] = { val typeInfo = implicitly[TypeInformation[T]] - javaEnv.fromCollection(data.asJava, typeInfo) + asScalaStream(javaEnv.fromCollection(data.asJava, typeInfo)) } /** * Creates a DataStream from the given [[SplittableIterator]]. */ def fromParallelCollection[T: ClassTag : TypeInformation] (data: SplittableIterator[T]): - DataStream[T] = { + DataStream[T] = { val typeInfo = implicitly[TypeInformation[T]] - javaEnv.fromParallelCollection(data, typeInfo) + asScalaStream(javaEnv.fromParallelCollection(data, typeInfo)) } /** * Creates a DataStream that represents the Strings produced by reading the * given file line wise. The file will be read with the system's default * character set. - * */ def readTextFile(filePath: String): DataStream[String] = - javaEnv.readTextFile(filePath) + asScalaStream(javaEnv.readTextFile(filePath)) /** * Creates a data stream that represents the Strings produced by reading the given file * line wise. The character set with the given name will be used to read the files. */ def readTextFile(filePath: String, charsetName: String): DataStream[String] = - javaEnv.readTextFile(filePath, charsetName) + asScalaStream(javaEnv.readTextFile(filePath, charsetName)) /** @@ -449,8 +448,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path"). */ def readFile[T: ClassTag : TypeInformation](inputFormat: FileInputFormat[T], filePath: String): - DataStream[T] = - javaEnv.readFile(inputFormat, filePath) + DataStream[T] = + asScalaStream(javaEnv.readFile(inputFormat, filePath)) /** @@ -461,9 +460,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * every 100 milliseconds. * */ - def readFileStream(StreamPath: String, intervalMillis: Long = 100, watchType: WatchType = - WatchType.ONLY_NEW_FILES): DataStream[String] = - javaEnv.readFileStream(StreamPath, intervalMillis, watchType) + def readFileStream(StreamPath: String, intervalMillis: Long = 100, + watchType: WatchType = WatchType.ONLY_NEW_FILES): DataStream[String] = + asScalaStream(javaEnv.readFileStream(StreamPath, intervalMillis, watchType)) /** * Creates a new DataStream that contains the strings received infinitely @@ -473,8 +472,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { */ @PublicEvolving def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0): - DataStream[String] = - javaEnv.socketTextStream(hostname, port) + DataStream[String] = + asScalaStream(javaEnv.socketTextStream(hostname, port)) /** * Generic method to create an input data stream with a specific input format. @@ -484,7 +483,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { */ @PublicEvolving def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] = - javaEnv.createInput(inputFormat) + asScalaStream(javaEnv.createInput(inputFormat)) /** * Create a DataStream using a user defined source function for arbitrary @@ -497,15 +496,15 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { */ def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = { require(function != null, "Function must not be null.") + val cleanFun = scalaClean(function) val typeInfo = implicitly[TypeInformation[T]] - javaEnv.addSource(cleanFun).returns(typeInfo) + asScalaStream(javaEnv.addSource(cleanFun).returns(typeInfo)) } /** * Create a DataStream using a user defined source function for arbitrary * source functionality. - * */ def addSource[T: ClassTag: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = { require(function != null, "Function must not be null.") diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index 22d24fa6539fb46b9be9b1eb5022532b2acab5f2..d34dd1b6f5152943a7f2d9a14dd52970399e8224 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -102,7 +102,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @return The data stream that is the result of applying the reduce function to the window. */ def reduce(function: ReduceFunction[T]): DataStream[T] = { - javaStream.reduce(clean(function)) + asScalaStream(javaStream.reduce(clean(function))) } /** @@ -148,7 +148,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { val resultType : TypeInformation[R] = implicitly[TypeInformation[R]] - javaStream.fold(initialValue, function, resultType) + asScalaStream(javaStream.fold(initialValue, function, resultType)) } /** @@ -185,13 +185,15 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { */ def apply[R: TypeInformation: ClassTag]( function: WindowFunction[Iterable[T], R, K, W]): DataStream[R] = { + val cleanFunction = clean(function) val javaFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] { def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]) = { cleanFunction.apply(key, window, input.asScala, out) } } - javaStream.apply(javaFunction, implicitly[TypeInformation[R]]) + + asScalaStream(javaStream.apply(javaFunction, implicitly[TypeInformation[R]])) } /** @@ -217,7 +219,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { cleanedFunction(key, window, elements.asScala, out) } } - javaStream.apply(applyFunction, implicitly[TypeInformation[R]]) + asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]])) } /** @@ -234,7 +236,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { def apply[R: TypeInformation: ClassTag]( preAggregator: ReduceFunction[T], function: WindowFunction[T, R, K, W]): DataStream[R] = { - javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]]) + + val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.apply(clean(preAggregator), clean(function), resultType)) } /** @@ -251,6 +255,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { def apply[R: TypeInformation: ClassTag]( preAggregator: (T, T) => T, function: (K, W, T, Collector[R]) => Unit): DataStream[R] = { + if (function == null) { throw new NullPointerException("Reduce function must not be null.") } @@ -269,7 +274,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { cleanApply(key, window, input, out) } } - javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]]) + + asScalaStream(javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])) } /** @@ -288,11 +294,12 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { initialValue: R, foldFunction: FoldFunction[T, R], function: WindowFunction[R, R, K, W]): DataStream[R] = { - javaStream.apply( + + asScalaStream(javaStream.apply( initialValue, clean(foldFunction), clean(function), - implicitly[TypeInformation[R]]) + implicitly[TypeInformation[R]])) } /** @@ -310,6 +317,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { initialValue: R, foldFunction: (R, T) => R, function: (K, W, R, Collector[R]) => Unit): DataStream[R] = { + if (function == null) { throw new NullPointerException("Fold function must not be null.") } @@ -328,10 +336,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { cleanApply(key, window, input, out) } } - javaStream.apply(initialValue, folder, applyFunction, implicitly[TypeInformation[R]]) + val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] + asScalaStream(javaStream.apply(initialValue, folder, applyFunction, resultType)) } - // ------------------------------------------------------------------------ // Aggregations on the keyed windows // ------------------------------------------------------------------------ diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala index 26b0265efd19b5f5cf0ff547763b65bdb1556b7b..90f255c12861e3453f27db43faafb6115af81632 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/package.scala @@ -18,8 +18,6 @@ package org.apache.flink.streaming.api -import _root_.scala.reflect.ClassTag -import language.experimental.macros import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala.{createTuple2TypeInformation => apiTupleCreator} import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils} @@ -27,25 +25,44 @@ import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream } import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream } import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => ConnectedJavaStreams } import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream } + import language.implicitConversions +import language.experimental.macros package object scala { + // We have this here so that we always have generated TypeInformationS when // using the Scala API implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T] - implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] = - new DataStream[R](javaStream) - - implicit def javaToScalaGroupedStream[R, K](javaStream: KeyedJavaStream[R, K]): - KeyedStream[R, K] = new KeyedStream[R, K](javaStream) + /** + * Converts an [[org.apache.flink.streaming.api.datastream.DataStream]] to a + * [[org.apache.flink.streaming.api.scala.DataStream]]. + */ + private[flink] def asScalaStream[R](stream: JavaStream[R]) + = new DataStream[R](stream) - implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitStream[R] = - new SplitStream[R](javaStream) + /** + * Converts an [[org.apache.flink.streaming.api.datastream.KeyedStream]] to a + * [[org.apache.flink.streaming.api.scala.KeyedStream]]. + */ + private[flink] def asScalaStream[R, K](stream: KeyedJavaStream[R, K]) + = new KeyedStream[R, K](stream) - implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: ConnectedJavaStreams[IN1, IN2]): - ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream) + /** + * Converts an [[org.apache.flink.streaming.api.datastream.SplitStream]] to a + * [[org.apache.flink.streaming.api.scala.SplitStream]]. + */ + private[flink] def asScalaStream[R](stream: SplitJavaStream[R]) + = new SplitStream[R](stream) + /** + * Converts an [[org.apache.flink.streaming.api.datastream.ConnectedStreams]] to a + * [[org.apache.flink.streaming.api.scala.ConnectedStreams]]. + */ + private[flink] def asScalaStream[IN1, IN2](stream: ConnectedJavaStreams[IN1, IN2]) + = new ConnectedStreams[IN1, IN2](stream) + private[flink] def fieldNames2Indices( typeInfo: TypeInformation[_], fields: Array[String]): Array[Int] = {