提交 6dda5316 编写于 作者: S Stephan Ewen

[FLINK-3413] [streaming] Make implicit conversions from Java DataStream to...

[FLINK-3413] [streaming] Make implicit conversions from Java DataStream to Scala DataStream explicit

This also clean up a lot of JavaDocs in various Scala DataStream API classes.
上级 bd137aec
......@@ -24,7 +24,7 @@ import org.apache.flink.api.java.table.JavaStreamingTranslator
import org.apache.flink.api.table.Table
import org.apache.flink.api.table.plan._
import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.streaming.api.scala.{DataStream, javaToScalaStream}
import org.apache.flink.streaming.api.scala.{DataStream, asScalaStream}
/**
* [[PlanTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and
......@@ -41,7 +41,7 @@ class ScalaStreamingTranslator extends PlanTranslator {
override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): DataStream[O] = {
// fake it till you make it ...
javaToScalaStream(javaTranslator.translate(op))
asScalaStream(javaTranslator.translate(op))
}
override def createTable[A](
......
......@@ -321,8 +321,8 @@ public class CoGroupedStreams<T1, T2> {
private static class UnionTypeInfo<T1, T2> extends TypeInformation<TaggedUnion<T1, T2>> {
private static final long serialVersionUID = 1L;
TypeInformation<T1> oneType;
TypeInformation<T2> twoType;
private final TypeInformation<T1> oneType;
private final TypeInformation<T2> twoType;
public UnionTypeInfo(TypeInformation<T1> oneType,
TypeInformation<T2> twoType) {
......
......@@ -31,10 +31,21 @@ import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import static java.util.Objects.requireNonNull;
/**
* {@code ConnectedStreams} represents two connected streams of (possible) different data types. It
* can be used to apply transformations such as {@link CoMapFunction} on two
* {@link DataStream DataStreams}
* ConnectedStreams represent two connected streams of (possibly) different data types.
* Connected streams are useful for cases where operations on one stream directly
* affect the operations on the other stream, usually via shared state between the streams.
*
* <p>An example for the use of connected streams would be to apply rules that change over time
* onto another stream. One of the connected streams has the rules, the other stream the
* elements to apply the rules to. The operation on the connected stream maintains the
* current set of rules in the state. It may receive either a rule update and update the state
* or a data element and apply the rules in the state to the element.
*
* <p>The connected stream can be conceptually viewed as a union stream of an Either type, that
* holds either the first stream's type or the second stream's type.
*
* @param <IN1> Type of the first input data steam.
* @param <IN2> Type of the second input data stream.
......@@ -42,20 +53,14 @@ import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
@Public
public class ConnectedStreams<IN1, IN2> {
protected StreamExecutionEnvironment environment;
protected DataStream<IN1> inputStream1;
protected DataStream<IN2> inputStream2;
protected final StreamExecutionEnvironment environment;
protected final DataStream<IN1> inputStream1;
protected final DataStream<IN2> inputStream2;
protected ConnectedStreams(StreamExecutionEnvironment env,
DataStream<IN1> input1,
DataStream<IN2> input2) {
this.environment = env;
if (input1 != null) {
this.inputStream1 = input1;
}
if (input2 != null) {
this.inputStream2 = input2;
}
protected ConnectedStreams(StreamExecutionEnvironment env, DataStream<IN1> input1, DataStream<IN2> input2) {
this.environment = requireNonNull(env);
this.inputStream1 = requireNonNull(input1);
this.inputStream2 = requireNonNull(input2);
}
public StreamExecutionEnvironment getExecutionEnvironment() {
......
......@@ -355,7 +355,7 @@ public class DataStream<T> {
* Note: This method works only on single field keys.
*
* @param partitioner The partitioner to assign partitions to keys.
* @param field The field index on which the DataStream is to partitioned.
* @param field The expression for the field on which the DataStream is to partitioned.
* @return The partitioned DataStream.
*/
public <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, String field) {
......
......@@ -25,20 +25,44 @@ import org.apache.flink.api.common.functions.Function;
import org.apache.flink.util.Collector;
/**
* A CoFlatMapFunction represents a FlatMap transformation with two different
* input types.
* A CoFlatMapFunction implements a flat-map transformation over two
* connected streams.
*
* <p>The same instance of the transformation function is used to transform
* both of the connected streams. That way, the stream transformations can
* share state.
*
* <p>An example for the use of connected streams would be to apply rules that change over time
* onto elements of a stream. One of the connected streams has the rules, the other stream the
* elements to apply the rules to. The operation on the connected stream maintains the
* current set of rules in the state. It may receive either a rule update (from the first stream)
* and update the state, or a data element (from the second stream) and apply the rules in the
* state to the element. The result of applying the rules would be emitted.
*
* @param <IN1>
* Type of the first input.
* @param <IN2>
* Type of the second input.
* @param <OUT>
* Output type.
* @param <IN1> Type of the first input.
* @param <IN2> Type of the second input.
* @param <OUT> Output type.
*/
@Public
public interface CoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
/**
* This method is called for each element in the first of the connected streams.
*
* @param value The stream element
* @param out The collector to emit resulting elements to
* @throws Exception The function may throw exceptions which cause the streaming program
* to fail and go into recovery.
*/
void flatMap1(IN1 value, Collector<OUT> out) throws Exception;
/**
* This method is called for each element in the second of the connected streams.
*
* @param value The stream element
* @param out The collector to emit resulting elements to
* @throws Exception The function may throw exceptions which cause the streaming program
* to fail and go into recovery.
*/
void flatMap2(IN2 value, Collector<OUT> out) throws Exception;
}
......@@ -24,20 +24,37 @@ import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.functions.Function;
/**
* A CoMapFunction represents a Map transformation with two different input
* types.
* A CoFlatMapFunction implements a map() transformation over two
* connected streams.
*
* @param <IN1>
* Type of the first input.
* @param <IN2>
* Type of the second input.
* @param <OUT>
* Output type.
* <p>The same instance of the transformation function is used to transform
* both of the connected streams. That way, the stream transformations can
* share state.
*
* @param <IN1> Type of the first input.
* @param <IN2> Type of the second input.
* @param <OUT> Output type.
*/
@Public
public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
/**
* This method is called for each element in the first of the connected streams.
*
* @param value The stream element
* @return The resulting element
* @throws Exception The function may throw exceptions which cause the streaming program
* to fail and go into recovery.
*/
OUT map1(IN1 value) throws Exception;
/**
* This method is called for each element in the second of the connected streams.
*
* @param value The stream element
* @return The resulting element
* @throws Exception The function may throw exceptions which cause the streaming program
* to fail and go into recovery.
*/
OUT map2(IN2 value) throws Exception;
}
......@@ -99,7 +99,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @return The data stream that is the result of applying the reduce function to the window.
*/
def reduce(function: ReduceFunction[T]): DataStream[T] = {
javaStream.reduce(clean(function))
asScalaStream(javaStream.reduce(clean(function)))
}
/**
......@@ -139,13 +139,13 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
def fold[R: TypeInformation: ClassTag](
initialValue: R,
function: FoldFunction[T,R]): DataStream[R] = {
if (function == null) {
throw new NullPointerException("Fold function must not be null.")
}
val resultType : TypeInformation[R] = implicitly[TypeInformation[R]]
javaStream.fold(initialValue, function, resultType)
asScalaStream(javaStream.fold(initialValue, function, resultType))
}
/**
......@@ -182,13 +182,14 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
*/
def apply[R: TypeInformation: ClassTag](
function: AllWindowFunction[Iterable[T], R, W]): DataStream[R] = {
val cleanedFunction = clean(function)
val javaFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] {
def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
cleanedFunction(window, elements.asScala, out)
}
}
javaStream.apply(javaFunction, implicitly[TypeInformation[R]])
asScalaStream(javaStream.apply(javaFunction, implicitly[TypeInformation[R]]))
}
/**
......@@ -204,13 +205,14 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
*/
def apply[R: TypeInformation: ClassTag](
function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
val cleanedFunction = clean(function)
val applyFunction = new AllWindowFunction[java.lang.Iterable[T], R, W] {
def apply(window: W, elements: java.lang.Iterable[T], out: Collector[R]): Unit = {
cleanedFunction(window, elements.asScala, out)
}
}
javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
}
/**
......@@ -227,7 +229,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
def apply[R: TypeInformation: ClassTag](
preAggregator: ReduceFunction[T],
function: AllWindowFunction[T, R, W]): DataStream[R] = {
javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.apply(clean(preAggregator), clean(function), returnType))
}
/**
......@@ -262,7 +266,9 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
cleanApply(window, input, out)
}
}
javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.apply(reducer, applyFunction, returnType))
}
/**
......@@ -281,11 +287,12 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
initialValue: R,
preAggregator: FoldFunction[T, R],
function: AllWindowFunction[R, R, W]): DataStream[R] = {
javaStream.apply(
asScalaStream(javaStream.apply(
initialValue,
clean(preAggregator),
clean(function),
implicitly[TypeInformation[R]])
implicitly[TypeInformation[R]]))
}
/**
......@@ -322,7 +329,8 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
cleanApply(window, input, out)
}
}
javaStream.apply(initialValue, folder, applyFunction, implicitly[TypeInformation[R]])
val returnType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.apply(initialValue, folder, applyFunction, returnType))
}
// ------------------------------------------------------------------------
......
......@@ -278,13 +278,13 @@ object CoGroupedStreams {
val coGroup = new JavaCoGroupedStreams[T1, T2](input1.javaStream, input2.javaStream)
coGroup
asScalaStream(coGroup
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.apply(clean(function), implicitly[TypeInformation[T]])
.apply(clean(function), implicitly[TypeInformation[T]]))
}
/**
......
......@@ -49,10 +49,10 @@ class DataStream[T](stream: JavaStream[T]) {
def javaStream: JavaStream[T] = stream
/**
* Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]].
*
* @return associated execution environment
*/
* Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]].
*
* @return associated execution environment
*/
def getExecutionEnvironment: StreamExecutionEnvironment =
new StreamExecutionEnvironment(stream.getExecutionEnvironment)
......@@ -112,7 +112,7 @@ class DataStream[T](stream: JavaStream[T]) {
* @return The named operator
*/
def name(name: String) : DataStream[T] = stream match {
case stream : SingleOutputStreamOperator[T,_] => stream.name(name)
case stream : SingleOutputStreamOperator[T,_] => asScalaStream(stream.name(name))
case _ => throw new UnsupportedOperationException("Only supported for operators.")
this
}
......@@ -131,7 +131,7 @@ class DataStream[T](stream: JavaStream[T]) {
*/
@PublicEvolving
def uid(uid: String) : DataStream[T] = javaStream match {
case stream : SingleOutputStreamOperator[T,_] => stream.uid(uid)
case stream : SingleOutputStreamOperator[T,_] => asScalaStream(stream.uid(uid))
case _ => throw new UnsupportedOperationException("Only supported for operators.")
this
}
......@@ -231,7 +231,7 @@ class DataStream[T](stream: JavaStream[T]) {
*
*/
def union(dataStreams: DataStream[T]*): DataStream[T] =
stream.union(dataStreams.map(_.javaStream): _*)
asScalaStream(stream.union(dataStreams.map(_.javaStream): _*))
/**
* Creates a new ConnectedStreams by connecting
......@@ -239,20 +239,20 @@ class DataStream[T](stream: JavaStream[T]) {
* DataStreams connected using this operators can be used with CoFunctions.
*/
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] =
stream.connect(dataStream.javaStream)
asScalaStream(stream.connect(dataStream.javaStream))
/**
* Groups the elements of a DataStream by the given key positions (for tuple/array types) to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = stream.keyBy(fields: _*)
def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = asScalaStream(stream.keyBy(fields: _*))
/**
* Groups the elements of a DataStream by the given field expressions to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple] =
stream.keyBy(firstField +: otherFields.toArray: _*)
asScalaStream(stream.keyBy(firstField +: otherFields.toArray: _*))
/**
* Groups the elements of a DataStream by the given K key to
......@@ -267,21 +267,22 @@ class DataStream[T](stream: JavaStream[T]) {
def getKey(in: T) = cleanFun(in)
override def getProducedType: TypeInformation[K] = keyType
}
new JavaKeyedStream(stream, keyExtractor, keyType)
asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType))
}
/**
* Partitions the elements of a DataStream by the given key positions (for tuple/array types) to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def partitionByHash(fields: Int*): DataStream[T] = stream.partitionByHash(fields: _*)
def partitionByHash(fields: Int*): DataStream[T] =
asScalaStream(stream.partitionByHash(fields: _*))
/**
* Groups the elements of a DataStream by the given field expressions to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def partitionByHash(firstField: String, otherFields: String*): DataStream[T] =
stream.partitionByHash(firstField +: otherFields.toArray: _*)
asScalaStream(stream.partitionByHash(firstField +: otherFields.toArray: _*))
/**
* Groups the elements of a DataStream by the given K key to
......@@ -294,7 +295,8 @@ class DataStream[T](stream: JavaStream[T]) {
def getKey(in: T) = cleanFun(in)
override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
}
stream.partitionByHash(keyExtractor)
asScalaStream(stream.partitionByHash(keyExtractor))
}
/**
......@@ -305,7 +307,7 @@ class DataStream[T](stream: JavaStream[T]) {
* Note: This method works only on single field keys.
*/
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] =
stream.partitionCustom(partitioner, field)
asScalaStream(stream.partitionCustom(partitioner, field))
/**
* Partitions a POJO DataStream on the specified key fields using a custom partitioner.
......@@ -315,7 +317,8 @@ class DataStream[T](stream: JavaStream[T]) {
* Note: This method works only on single field keys.
*/
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String)
: DataStream[T] = stream.partitionCustom(partitioner, field)
: DataStream[T] =
asScalaStream(stream.partitionCustom(partitioner, field))
/**
* Partitions a DataStream on the key returned by the selector, using a custom partitioner.
......@@ -326,20 +329,24 @@ class DataStream[T](stream: JavaStream[T]) {
* of fields.
*/
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], fun: T => K)
: DataStream[T] = {
: DataStream[T] = {
val keyType = implicitly[TypeInformation[K]]
val cleanFun = clean(fun)
val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
override def getProducedType(): TypeInformation[K] = keyType
}
stream.partitionCustom(partitioner, keyExtractor)
asScalaStream(stream.partitionCustom(partitioner, keyExtractor))
}
/**
* Sets the partitioning of the DataStream so that the output tuples
* are broad casted to every parallel instance of the next component.
*/
def broadcast: DataStream[T] = stream.broadcast()
def broadcast: DataStream[T] = asScalaStream(stream.broadcast())
/**
* Sets the partitioning of the DataStream so that the output values all go to
......@@ -347,27 +354,27 @@ class DataStream[T](stream: JavaStream[T]) {
* since it might cause a serious performance bottleneck in the application.
*/
@PublicEvolving
def global: DataStream[T] = stream.global()
def global: DataStream[T] = asScalaStream(stream.global())
/**
* Sets the partitioning of the DataStream so that the output tuples
* are shuffled to the next component.
*/
@PublicEvolving
def shuffle: DataStream[T] = stream.shuffle()
def shuffle: DataStream[T] = asScalaStream(stream.shuffle())
/**
* Sets the partitioning of the DataStream so that the output tuples
* are forwarded to the local subtask of the next component (whenever
* possible).
*/
def forward: DataStream[T] = stream.forward()
def forward: DataStream[T] = asScalaStream(stream.forward())
/**
* Sets the partitioning of the DataStream so that the output tuples
* are distributed evenly to the next component.
*/
def rebalance: DataStream[T] = stream.rebalance()
def rebalance: DataStream[T] = asScalaStream(stream.rebalance())
/**
* Sets the partitioning of the [[DataStream]] so that the output tuples
......@@ -387,7 +394,7 @@ class DataStream[T](stream: JavaStream[T]) {
* downstream operations will have a differing number of inputs from upstream operations.
*/
@PublicEvolving
def rescale: DataStream[T] = stream.rescale()
def rescale: DataStream[T] = asScalaStream(stream.rescale())
/**
* Initiates an iterative part of the program that creates a loop by feeding
......@@ -440,13 +447,16 @@ class DataStream[T](stream: JavaStream[T]) {
*
*/
@PublicEvolving
def iterate[R, F: TypeInformation: ClassTag](stepFunction: ConnectedStreams[T, F] =>
(DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = {
def iterate[R, F: TypeInformation: ClassTag](
stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]),
maxWaitTimeMillis:Long): DataStream[R] = {
val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]]
val connectedIterativeStream = stream.iterate(maxWaitTimeMillis).
withFeedbackType(feedbackType)
val (feedback, output) = stepFunction(connectedIterativeStream)
val (feedback, output) = stepFunction(asScalaStream(connectedIterativeStream))
connectedIterativeStream.closeWith(feedback.javaStream)
output
}
......@@ -475,7 +485,7 @@ class DataStream[T](stream: JavaStream[T]) {
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
stream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]]
asScalaStream(stream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]])
}
/**
......@@ -488,7 +498,7 @@ class DataStream[T](stream: JavaStream[T]) {
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
stream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]]
asScalaStream(stream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]])
}
/**
......@@ -528,7 +538,7 @@ class DataStream[T](stream: JavaStream[T]) {
if (filter == null) {
throw new NullPointerException("Filter function must not be null.")
}
stream.filter(filter)
asScalaStream(stream.filter(filter))
}
/**
......@@ -539,10 +549,10 @@ class DataStream[T](stream: JavaStream[T]) {
throw new NullPointerException("Filter function must not be null.")
}
val cleanFun = clean(fun)
val filter = new FilterFunction[T] {
val filterFun = new FilterFunction[T] {
def filter(in: T) = cleanFun(in)
}
this.filter(filter)
filter(filterFun)
}
/**
......@@ -643,7 +653,7 @@ class DataStream[T](stream: JavaStream[T]) {
*/
@deprecated
def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = {
stream.assignTimestamps(clean(extractor))
asScalaStream(stream.assignTimestamps(clean(extractor)))
}
/**
......@@ -671,8 +681,8 @@ class DataStream[T](stream: JavaStream[T]) {
@PublicEvolving
def assignTimestampsAndWatermarks(assigner: AssignerWithPeriodicWatermarks[T])
: DataStream[T] = {
stream.assignTimestampsAndWatermarks(assigner)
asScalaStream(stream.assignTimestampsAndWatermarks(assigner))
}
/**
......@@ -701,8 +711,8 @@ class DataStream[T](stream: JavaStream[T]) {
@PublicEvolving
def assignTimestampsAndWatermarks(assigner: AssignerWithPunctuatedWatermarks[T])
: DataStream[T] = {
stream.assignTimestampsAndWatermarks(assigner)
asScalaStream(stream.assignTimestampsAndWatermarks(assigner))
}
/**
......@@ -726,7 +736,7 @@ class DataStream[T](stream: JavaStream[T]) {
cleanExtractor(element)
}
}
stream.assignTimestampsAndWatermarks(extractorFunction)
asScalaStream(stream.assignTimestampsAndWatermarks(extractorFunction))
}
/**
......@@ -735,7 +745,7 @@ class DataStream[T](stream: JavaStream[T]) {
* OutputSelector. Calling this method on an operator creates a new
* [[SplitStream]].
*/
def split(selector: OutputSelector[T]): SplitStream[T] = stream.split(selector)
def split(selector: OutputSelector[T]): SplitStream[T] = asScalaStream(stream.split(selector))
/**
* Creates a new [[SplitStream]] that contains only the elements satisfying the
......
......@@ -269,13 +269,13 @@ object JoinedStreams {
val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream)
join
asScalaStream(join
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.apply(clean(function), implicitly[TypeInformation[T]])
.apply(clean(function), implicitly[TypeInformation[T]]))
}
/**
......@@ -286,13 +286,13 @@ object JoinedStreams {
val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream)
join
asScalaStream(join
.where(keySelector1)
.equalTo(keySelector2)
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
.apply(clean(function), implicitly[TypeInformation[T]])
.apply(clean(function), implicitly[TypeInformation[T]]))
}
/**
......
......@@ -128,7 +128,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
throw new NullPointerException("Reduce function must not be null.")
}
javaStream.reduce(reducer)
asScalaStream(javaStream.reduce(reducer))
}
/**
......@@ -141,7 +141,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
}
val cleanFun = clean(fun)
val reducer = new ReduceFunction[T] {
def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
def reduce(v1: T, v2: T) : T = { cleanFun(v1, v2) }
}
reduce(reducer)
}
......@@ -152,15 +152,15 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* aggregate is kept per key.
*/
def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]):
DataStream[R] = {
DataStream[R] = {
if (folder == null) {
throw new NullPointerException("Fold function must not be null.")
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
javaStream.fold(initialValue, folder).
returns(outType).asInstanceOf[JavaStream[R]]
asScalaStream(javaStream.fold(initialValue, folder).
returns(outType).asInstanceOf[JavaStream[R]])
}
/**
......
......@@ -34,6 +34,6 @@ class SplitStream[T](javaStream: SplitJavaStream[T]) extends DataStream[T](javaS
/**
* Sets the output names for which the next operator will receive values.
*/
def select(outputNames: String*): DataStream[T] = javaStream.select(outputNames: _*)
def select(outputNames: String*): DataStream[T] =
asScalaStream(javaStream.select(outputNames: _*))
}
......@@ -27,7 +27,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.runtime.state.AbstractStateBackend
import org.apache.flink.streaming.api.{TimeCharacteristic, CheckpointingMode}
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
import org.apache.flink.streaming.api.functions.source.SourceFunction
......@@ -404,7 +403,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
require(data != null, "Data must not be null.")
val typeInfo = implicitly[TypeInformation[T]]
javaEnv.fromCollection(scala.collection.JavaConversions.asJavaCollection(data), typeInfo)
val collection = scala.collection.JavaConversions.asJavaCollection(data)
asScalaStream(javaEnv.fromCollection(collection, typeInfo))
}
/**
......@@ -415,33 +415,32 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*/
def fromCollection[T: ClassTag : TypeInformation] (data: Iterator[T]): DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
javaEnv.fromCollection(data.asJava, typeInfo)
asScalaStream(javaEnv.fromCollection(data.asJava, typeInfo))
}
/**
* Creates a DataStream from the given [[SplittableIterator]].
*/
def fromParallelCollection[T: ClassTag : TypeInformation] (data: SplittableIterator[T]):
DataStream[T] = {
DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
javaEnv.fromParallelCollection(data, typeInfo)
asScalaStream(javaEnv.fromParallelCollection(data, typeInfo))
}
/**
* Creates a DataStream that represents the Strings produced by reading the
* given file line wise. The file will be read with the system's default
* character set.
*
*/
def readTextFile(filePath: String): DataStream[String] =
javaEnv.readTextFile(filePath)
asScalaStream(javaEnv.readTextFile(filePath))
/**
* Creates a data stream that represents the Strings produced by reading the given file
* line wise. The character set with the given name will be used to read the files.
*/
def readTextFile(filePath: String, charsetName: String): DataStream[String] =
javaEnv.readTextFile(filePath, charsetName)
asScalaStream(javaEnv.readTextFile(filePath, charsetName))
/**
......@@ -449,8 +448,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
*/
def readFile[T: ClassTag : TypeInformation](inputFormat: FileInputFormat[T], filePath: String):
DataStream[T] =
javaEnv.readFile(inputFormat, filePath)
DataStream[T] =
asScalaStream(javaEnv.readFile(inputFormat, filePath))
/**
......@@ -461,9 +460,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* every 100 milliseconds.
*
*/
def readFileStream(StreamPath: String, intervalMillis: Long = 100, watchType: WatchType =
WatchType.ONLY_NEW_FILES): DataStream[String] =
javaEnv.readFileStream(StreamPath, intervalMillis, watchType)
def readFileStream(StreamPath: String, intervalMillis: Long = 100,
watchType: WatchType = WatchType.ONLY_NEW_FILES): DataStream[String] =
asScalaStream(javaEnv.readFileStream(StreamPath, intervalMillis, watchType))
/**
* Creates a new DataStream that contains the strings received infinitely
......@@ -473,8 +472,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*/
@PublicEvolving
def socketTextStream(hostname: String, port: Int, delimiter: Char = '\n', maxRetry: Long = 0):
DataStream[String] =
javaEnv.socketTextStream(hostname, port)
DataStream[String] =
asScalaStream(javaEnv.socketTextStream(hostname, port))
/**
* Generic method to create an input data stream with a specific input format.
......@@ -484,7 +483,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*/
@PublicEvolving
def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] =
javaEnv.createInput(inputFormat)
asScalaStream(javaEnv.createInput(inputFormat))
/**
* Create a DataStream using a user defined source function for arbitrary
......@@ -497,15 +496,15 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*/
def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
require(function != null, "Function must not be null.")
val cleanFun = scalaClean(function)
val typeInfo = implicitly[TypeInformation[T]]
javaEnv.addSource(cleanFun).returns(typeInfo)
asScalaStream(javaEnv.addSource(cleanFun).returns(typeInfo))
}
/**
* Create a DataStream using a user defined source function for arbitrary
* source functionality.
*
*/
def addSource[T: ClassTag: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = {
require(function != null, "Function must not be null.")
......
......@@ -102,7 +102,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @return The data stream that is the result of applying the reduce function to the window.
*/
def reduce(function: ReduceFunction[T]): DataStream[T] = {
javaStream.reduce(clean(function))
asScalaStream(javaStream.reduce(clean(function)))
}
/**
......@@ -148,7 +148,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
val resultType : TypeInformation[R] = implicitly[TypeInformation[R]]
javaStream.fold(initialValue, function, resultType)
asScalaStream(javaStream.fold(initialValue, function, resultType))
}
/**
......@@ -185,13 +185,15 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
*/
def apply[R: TypeInformation: ClassTag](
function: WindowFunction[Iterable[T], R, K, W]): DataStream[R] = {
val cleanFunction = clean(function)
val javaFunction = new WindowFunction[java.lang.Iterable[T], R, K, W] {
def apply(key: K, window: W, input: java.lang.Iterable[T], out: Collector[R]) = {
cleanFunction.apply(key, window, input.asScala, out)
}
}
javaStream.apply(javaFunction, implicitly[TypeInformation[R]])
asScalaStream(javaStream.apply(javaFunction, implicitly[TypeInformation[R]]))
}
/**
......@@ -217,7 +219,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
cleanedFunction(key, window, elements.asScala, out)
}
}
javaStream.apply(applyFunction, implicitly[TypeInformation[R]])
asScalaStream(javaStream.apply(applyFunction, implicitly[TypeInformation[R]]))
}
/**
......@@ -234,7 +236,9 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
def apply[R: TypeInformation: ClassTag](
preAggregator: ReduceFunction[T],
function: WindowFunction[T, R, K, W]): DataStream[R] = {
javaStream.apply(clean(preAggregator), clean(function), implicitly[TypeInformation[R]])
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.apply(clean(preAggregator), clean(function), resultType))
}
/**
......@@ -251,6 +255,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
def apply[R: TypeInformation: ClassTag](
preAggregator: (T, T) => T,
function: (K, W, T, Collector[R]) => Unit): DataStream[R] = {
if (function == null) {
throw new NullPointerException("Reduce function must not be null.")
}
......@@ -269,7 +274,8 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
cleanApply(key, window, input, out)
}
}
javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]])
asScalaStream(javaStream.apply(reducer, applyFunction, implicitly[TypeInformation[R]]))
}
/**
......@@ -288,11 +294,12 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
initialValue: R,
foldFunction: FoldFunction[T, R],
function: WindowFunction[R, R, K, W]): DataStream[R] = {
javaStream.apply(
asScalaStream(javaStream.apply(
initialValue,
clean(foldFunction),
clean(function),
implicitly[TypeInformation[R]])
implicitly[TypeInformation[R]]))
}
/**
......@@ -310,6 +317,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
initialValue: R,
foldFunction: (R, T) => R,
function: (K, W, R, Collector[R]) => Unit): DataStream[R] = {
if (function == null) {
throw new NullPointerException("Fold function must not be null.")
}
......@@ -328,10 +336,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
cleanApply(key, window, input, out)
}
}
javaStream.apply(initialValue, folder, applyFunction, implicitly[TypeInformation[R]])
val resultType: TypeInformation[R] = implicitly[TypeInformation[R]]
asScalaStream(javaStream.apply(initialValue, folder, applyFunction, resultType))
}
// ------------------------------------------------------------------------
// Aggregations on the keyed windows
// ------------------------------------------------------------------------
......
......@@ -18,8 +18,6 @@
package org.apache.flink.streaming.api
import _root_.scala.reflect.ClassTag
import language.experimental.macros
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.{createTuple2TypeInformation => apiTupleCreator}
import org.apache.flink.api.scala.typeutils.{CaseClassTypeInfo, TypeUtils}
......@@ -27,25 +25,44 @@ import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
import org.apache.flink.streaming.api.datastream.{ SplitStream => SplitJavaStream }
import org.apache.flink.streaming.api.datastream.{ ConnectedStreams => ConnectedJavaStreams }
import org.apache.flink.streaming.api.datastream.{ KeyedStream => KeyedJavaStream }
import language.implicitConversions
import language.experimental.macros
package object scala {
// We have this here so that we always have generated TypeInformationS when
// using the Scala API
implicit def createTypeInformation[T]: TypeInformation[T] = macro TypeUtils.createTypeInfo[T]
implicit def javaToScalaStream[R](javaStream: JavaStream[R]): DataStream[R] =
new DataStream[R](javaStream)
implicit def javaToScalaGroupedStream[R, K](javaStream: KeyedJavaStream[R, K]):
KeyedStream[R, K] = new KeyedStream[R, K](javaStream)
/**
* Converts an [[org.apache.flink.streaming.api.datastream.DataStream]] to a
* [[org.apache.flink.streaming.api.scala.DataStream]].
*/
private[flink] def asScalaStream[R](stream: JavaStream[R])
= new DataStream[R](stream)
implicit def javaToScalaSplitStream[R](javaStream: SplitJavaStream[R]): SplitStream[R] =
new SplitStream[R](javaStream)
/**
* Converts an [[org.apache.flink.streaming.api.datastream.KeyedStream]] to a
* [[org.apache.flink.streaming.api.scala.KeyedStream]].
*/
private[flink] def asScalaStream[R, K](stream: KeyedJavaStream[R, K])
= new KeyedStream[R, K](stream)
implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: ConnectedJavaStreams[IN1, IN2]):
ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream)
/**
* Converts an [[org.apache.flink.streaming.api.datastream.SplitStream]] to a
* [[org.apache.flink.streaming.api.scala.SplitStream]].
*/
private[flink] def asScalaStream[R](stream: SplitJavaStream[R])
= new SplitStream[R](stream)
/**
* Converts an [[org.apache.flink.streaming.api.datastream.ConnectedStreams]] to a
* [[org.apache.flink.streaming.api.scala.ConnectedStreams]].
*/
private[flink] def asScalaStream[IN1, IN2](stream: ConnectedJavaStreams[IN1, IN2])
= new ConnectedStreams[IN1, IN2](stream)
private[flink] def fieldNames2Indices(
typeInfo: TypeInformation[_],
fields: Array[String]): Array[Int] = {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册