提交 6f215b50 编写于 作者: G Gyula Fora

[scala] [streaming] Fixed scala formatting

上级 75dd021a
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......@@ -82,7 +82,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
javaStream match {
case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(dop)
case _ =>
throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot have " +
throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot " +
"have " +
"parallelism.")
}
this
......@@ -94,7 +95,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
def getParallelism: Int = javaStream match {
case op: SingleOutputStreamOperator[_, _] => op.getParallelism
case _ =>
throw new UnsupportedOperationException("Operator " + javaStream.toString + " does not have " +
throw new UnsupportedOperationException("Operator " + javaStream.toString + " does not have" +
" " +
"parallelism.")
}
......@@ -139,7 +141,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
/**
* Sets the partitioning of the DataStream so that the output is
* partitioned by the selected fields. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator.
* partitioned by the selected fields. This setting only effects the how the outputs will be
* distributed between the parallel instances of the next processing operator.
*
*/
def partitionBy(fields: Int*): DataStream[T] =
......@@ -147,7 +150,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
/**
* Sets the partitioning of the DataStream so that the output is
* partitioned by the selected fields. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator.
* partitioned by the selected fields. This setting only effects the how the outputs will be
* distributed between the parallel instances of the next processing operator.
*
*/
def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
......@@ -155,7 +159,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
/**
* Sets the partitioning of the DataStream so that the output is
* partitioned by the given Key. This setting only effects the how the outputs will be distributed between the parallel instances of the next processing operator.
* partitioned by the given Key. This setting only effects the how the outputs will be
* distributed between the parallel instances of the next processing operator.
*
*/
def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
......@@ -222,7 +227,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*
*/
def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]), maxWaitTimeMillis: Long = 0): DataStream[T] = {
def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]), maxWaitTimeMillis:
Long = 0): DataStream[T] = {
val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
......@@ -252,19 +258,24 @@ class DataStream[T](javaStream: JavaStream[T]) {
/**
* Applies an aggregation that that gives the current minimum element of the data stream by
* the given position. When equality, the user can set to get the first or last element with the minimal value.
* the given position. When equality, the user can set to get the first or last element with
* the minimal value.
*
*/
def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY, position, first)
def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType
.MINBY, position, first)
/**
* Applies an aggregation that that gives the current maximum element of the data stream by
* the given position. When equality, the user can set to get the first or last element with the maximal value.
* the given position. When equality, the user can set to get the first or last element with
* the maximal value.
*
*/
def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY, position, first)
def maxBy(position: Int, first: Boolean = true): DataStream[T] =
aggregate(AggregationType.MAXBY, position, first)
private def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true): DataStream[T] = {
private def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true):
DataStream[T] = {
val jStream = javaStream.asInstanceOf[JavaStream[Product]]
val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
......@@ -272,15 +283,18 @@ class DataStream[T](javaStream: JavaStream[T]) {
val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position)
val reducer = aggregationType match {
case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).getTypeClass()));
case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).
getTypeClass()));
case _ => new agg.ProductComparableAggregator(aggregationType, first)
}
val invokable = jStream match {
case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer, groupedStream.getKeySelector())
case groupedStream: GroupedDataStream[_] => new GroupedReduceInvokable(reducer,
groupedStream.getKeySelector())
case _ => new StreamReduceInvokable(reducer)
}
new DataStream[Product](jStream.transform("aggregation", jStream.getType(), invokable)).asInstanceOf[DataStream[T]]
new DataStream[Product](jStream.transform("aggregation", jStream.getType(),
invokable)).asInstanceOf[DataStream[T]]
}
/**
......@@ -288,7 +302,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* received records.
*
*/
def count: DataStream[Long] = new DataStream[java.lang.Long](javaStream.count()).asInstanceOf[DataStream[Long]]
def count: DataStream[Long] = new DataStream[java.lang.Long](
javaStream.count()).asInstanceOf[DataStream[Long]]
/**
* Creates a new DataStream by applying the given function to every element of this DataStream.
......@@ -302,7 +317,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
def map(in: T): R = cleanFun(in)
}
new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper)))
new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]],
new MapInvokable[T, R](mapper)))
}
/**
......@@ -313,7 +329,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
throw new NullPointerException("Map function must not be null.")
}
new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]], new MapInvokable[T, R](mapper)))
new DataStream(javaStream.transform("map", implicitly[TypeInformation[R]],
new MapInvokable[T, R](mapper)))
}
/**
......@@ -324,7 +341,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
if (flatMapper == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
new DataStream[R](javaStream.transform("flatMap", implicitly[TypeInformation[R]], new FlatMapInvokable[T, R](flatMapper)))
new DataStream[R](javaStream.transform("flatMap", implicitly[TypeInformation[R]],
new FlatMapInvokable[T, R](flatMapper)))
}
/**
......@@ -358,22 +376,24 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
/**
* Creates a new [[DataStream]] by reducing the elements of this DataStream using an associative reduce
* function.
* Creates a new [[DataStream]] by reducing the elements of this DataStream
* using an associative reduce function.
*/
def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
if (reducer == null) {
throw new NullPointerException("Reduce function must not be null.")
}
javaStream match {
case ds: GroupedDataStream[_] => new DataStream[T](javaStream.transform("reduce", javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector())))
case _ => new DataStream[T](javaStream.transform("reduce", javaStream.getType(), new StreamReduceInvokable[T](reducer)))
case ds: GroupedDataStream[_] => new DataStream[T](javaStream.transform("reduce",
javaStream.getType(), new GroupedReduceInvokable[T](reducer, ds.getKeySelector())))
case _ => new DataStream[T](javaStream.transform("reduce", javaStream.getType(),
new StreamReduceInvokable[T](reducer)))
}
}
/**
* Creates a new [[DataStream]] by reducing the elements of this DataStream using an associative reduce
* function.
* Creates a new [[DataStream]] by reducing the elements of this DataStream
* using an associative reduce function.
*/
def reduce(fun: (T, T) => T): DataStream[T] = {
if (fun == null) {
......@@ -421,7 +441,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* the trigger and eviction policies please use to
* window(List(triggers), List(evicters))
*/
def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = new WindowedDataStream[T](javaStream.window(windowingHelper: _*))
def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
new WindowedDataStream[T](javaStream.window(windowingHelper: _*))
/**
* Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s.
......@@ -430,7 +451,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* use-cases please refer to window(WindowingHelper[_]*)
*
*/
def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]): WindowedDataStream[T] = new WindowedDataStream[T](javaStream.window(triggers, evicters))
def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]):
WindowedDataStream[T] = new WindowedDataStream[T](javaStream.window(triggers, evicters))
/**
*
......@@ -473,7 +495,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* to use custom join function.
*
*/
def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] = new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
def join[R](stream: DataStream[R]): StreamJoinOperator[T, R] =
new StreamJoinOperator[T, R](javaStream, stream.getJavaStream)
/**
* Initiates a temporal cross transformation that builds all pair
......@@ -487,7 +510,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* to use custom join function.
*
*/
def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] = new StreamCrossOperator[T, R](javaStream, stream.getJavaStream)
def cross[R](stream: DataStream[R]): StreamCrossOperator[T, R] =
new StreamCrossOperator[T, R](javaStream, stream.getJavaStream)
/**
* Writes a DataStream to the standard output stream (stdout). For each
......@@ -504,7 +528,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* is written.
*
*/
def writeAsText(path: String, millis: Long = 0): DataStream[T] = new DataStream[T](javaStream.writeAsText(path, millis))
def writeAsText(path: String, millis: Long = 0): DataStream[T] =
new DataStream[T](javaStream.writeAsText(path, millis))
/**
* Writes a DataStream to the file specified by path in text format. The
......@@ -513,7 +538,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* is written.
*
*/
def writeAsCsv(path: String, millis: Long = 0): DataStream[T] = new DataStream[T](javaStream.writeAsCsv(path, millis))
def writeAsCsv(path: String, millis: Long = 0): DataStream[T] =
new DataStream[T](javaStream.writeAsCsv(path, millis))
/**
* Adds the given sink to this DataStream. Only streams with sinks added
......@@ -521,7 +547,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* method is called.
*
*/
def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] = new DataStream[T](javaStream.addSink(sinkFuntion))
def addSink(sinkFuntion: SinkFunction[T]): DataStream[T] =
new DataStream[T](javaStream.addSink(sinkFuntion))
/**
* Adds the given sink to this DataStream. Only streams with sinks added
......@@ -540,4 +567,4 @@ class DataStream[T](javaStream: JavaStream[T]) {
this.addSink(sinkFunction)
}
}
\ No newline at end of file
}
......@@ -44,4 +44,4 @@ class FieldsKeySelector[IN](fields: Int*) extends KeySelector[IN, Tuple] {
case _ => throw new RuntimeException("Only tuple types are supported")
}
}
\ No newline at end of file
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......@@ -46,4 +47,4 @@ class SplitDataStream[T](javaStream: SplitJavaStream[T]) {
*/
def selectAll(): DataStream[T] = new DataStream[T](javaStream.selectAll())
}
\ No newline at end of file
}
......@@ -35,11 +35,13 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
import org.apache.flink.streaming.api.function.co.CrossWindowFunction
import org.apache.flink.api.common.functions.CrossFunction
class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {
override def createNextWindowOperator(): StreamCrossOperator.CrossWindow[I1, I2] = {
val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this, (l: I1, r: I2) => (l, r))
val crossWindowFunction = StreamCrossOperator.getCrossWindowFunction(this,
(l: I1, r: I2) => (l, r))
val returnType = new CaseClassTypeInfo[(I1, I2)](
......@@ -69,24 +71,31 @@ class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extend
}
object StreamCrossOperator {
private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2], javaStream: JavaStream[(I1, I2)]) extends DataStream[(I1, I2)](javaStream) {
private[flink] class CrossWindow[I1, I2](op: StreamCrossOperator[I1, I2],
javaStream: JavaStream[(I1, I2)]) extends
DataStream[(I1, I2)](javaStream) {
/**
* Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf call will be emitted.
* Sets a wrapper for the crossed elements. For each crossed pair, the result of the udf
* call will be emitted.
*
*/
def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
val invokable = new CoWindowInvokable[I1, I2, R](
clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
op.timeStamp2)
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), invokable)
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
invokable)
new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
}
}
private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2], crossFunction: (I1, I2) => R): CrossWindowFunction[I1, I2, R] = {
private[flink] def getCrossWindowFunction[I1, I2, R](op: StreamCrossOperator[I1, I2],
crossFunction: (I1, I2) => R):
CrossWindowFunction[I1, I2, R] = {
Validate.notNull(crossFunction, "Join function must not be null.")
val crossFun = new CrossFunction[I1, I2, R] {
......@@ -100,4 +109,4 @@ object StreamCrossOperator {
new CrossWindowFunction[I1, I2, R](crossFun)
}
}
\ No newline at end of file
}
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......@@ -117,7 +117,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*
*/
def generateSequence(from: Long, to: Long): DataStream[Long] = {
new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).asInstanceOf[DataStream[Long]]
new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).
asInstanceOf[DataStream[Long]]
}
/**
......@@ -147,7 +148,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
"elements", typeInfo);
javaEnv.getJobGraphBuilder.addStreamVertex(returnStream.getId(),
new SourceInvokable[T](new FromElementsFunction[T](scala.collection.JavaConversions.asJavaCollection(data))), null, typeInfo,
new SourceInvokable[T](new FromElementsFunction[T](scala.collection.JavaConversions
.asJavaCollection(data))), null, typeInfo,
"source", 1);
new DataStream(returnStream)
}
......@@ -204,7 +206,8 @@ object StreamExecutionEnvironment {
* of parallelism of the local environment is the number of hardware contexts (CPU cores/threads).
*/
def createLocalEnvironment(
degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()): StreamExecutionEnvironment = {
degreeOfParallelism: Int = Runtime.getRuntime.availableProcessors()):
StreamExecutionEnvironment = {
new StreamExecutionEnvironment(JavaEnv.createLocalEnvironment(degreeOfParallelism))
}
......@@ -223,7 +226,8 @@ object StreamExecutionEnvironment {
* those must be
* provided in the JAR files.
*/
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): StreamExecutionEnvironment = {
def createRemoteEnvironment(host: String, port: Int, jarFiles: String*):
StreamExecutionEnvironment = {
new StreamExecutionEnvironment(JavaEnv.createRemoteEnvironment(host, port, jarFiles: _*))
}
......@@ -251,4 +255,4 @@ object StreamExecutionEnvironment {
javaEnv.setDegreeOfParallelism(degreeOfParallelism)
new StreamExecutionEnvironment(javaEnv)
}
}
\ No newline at end of file
}
......@@ -33,7 +33,8 @@ import scala.reflect.ClassTag
import org.apache.commons.lang.Validate
import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {
override def createNextWindowOperator() = {
new StreamJoinOperator.JoinWindow[I1, I2](this)
......@@ -61,7 +62,8 @@ object StreamJoinOperator {
* to define the second key.
*/
def where(firstField: String, otherFields: String*) = {
new JoinPredicate[I1, I2](op, new PojoKeySelector[I1](op.input1.getType(), (firstField +: otherFields): _*))
new JoinPredicate[I1, I2](op, new PojoKeySelector[I1](op.input1.getType(),
(firstField +: otherFields): _*))
}
/**
......@@ -82,7 +84,8 @@ object StreamJoinOperator {
}
class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2], private[flink] val keys1: KeySelector[I1, _]) {
class JoinPredicate[I1, I2](private[flink] val op: StreamJoinOperator[I1, I2],
private[flink] val keys1: KeySelector[I1, _]) {
private[flink] var keys2: KeySelector[I2, _] = null
/**
......@@ -145,30 +148,36 @@ object StreamJoinOperator {
}
}
return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2)).addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
return op.input1.groupBy(keys1).connect(op.input2.groupBy(keys2))
.addGeneralWindowCombine(getJoinWindowFunction(this, (_, _)),
returnType, op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
}
}
class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends DataStream[(I1, I2)](javaStream) {
class JoinedStream[I1, I2](jp: JoinPredicate[I1, I2], javaStream: JavaStream[(I1, I2)]) extends
DataStream[(I1, I2)](javaStream) {
private val op = jp.op
/**
* Sets a wrapper for the joined elements. For each joined pair, the result of the udf call will be emitted.
* Sets a wrapper for the joined elements. For each joined pair, the result of the
* udf call will be emitted.
*/
def apply[R: TypeInformation: ClassTag](fun: (I1, I2) => R): DataStream[R] = {
val invokable = new CoWindowInvokable[I1, I2, R](
clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1, op.timeStamp2)
clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
op.timeStamp2)
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(), invokable)
javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
invokable)
new DataStream[R](javaStream.setType(implicitly[TypeInformation[R]]))
}
}
private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2], joinFunction: (I1, I2) => R) = {
private[flink] def getJoinWindowFunction[I1, I2, R](jp: JoinPredicate[I1, I2],
joinFunction: (I1, I2) => R) = {
Validate.notNull(joinFunction, "Join function must not be null.")
val joinFun = new JoinFunction[I1, I2, R] {
......@@ -183,4 +192,4 @@ object StreamJoinOperator {
new JoinWindowFunction[I1, I2, R](jp.keys1, jp.keys2, joinFun)
}
}
\ No newline at end of file
}
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......@@ -52,7 +52,8 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
* This controls how often the user defined function will be triggered on
* the window.
*/
def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] = new WindowedDataStream[T](javaStream.every(windowingHelper: _*))
def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
new WindowedDataStream[T](javaStream.every(windowingHelper: _*))
/**
* Groups the elements of the WindowedDataStream using the given
......@@ -126,12 +127,14 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
/**
* Applies a reduceGroup transformation on the windowed data stream by reducing
* the current window at every trigger. In contrast with the simple binary reduce operator, groupReduce exposes the whole window through the Iterable interface.
* the current window at every trigger. In contrast with the simple binary reduce operator,
* groupReduce exposes the whole window through the Iterable interface.
* </br>
* </br>
* Whenever possible try to use reduce instead of groupReduce for increased efficiency
*/
def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]): DataStream[R] = {
def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]):
DataStream[R] = {
if (reducer == null) {
throw new NullPointerException("GroupReduce function must not be null.")
}
......@@ -140,12 +143,14 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
/**
* Applies a reduceGroup transformation on the windowed data stream by reducing
* the current window at every trigger. In contrast with the simple binary reduce operator, groupReduce exposes the whole window through the Iterable interface.
* the current window at every trigger. In contrast with the simple binary reduce operator,
* groupReduce exposes the whole window through the Iterable interface.
* </br>
* </br>
* Whenever possible try to use reduce instead of groupReduce for increased efficiency
*/
def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit): DataStream[R] = {
def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit):
DataStream[R] = {
if (fun == null) {
throw new NullPointerException("GroupReduce function must not be null.")
}
......@@ -181,16 +186,19 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
* the given position. When equality, returns the first.
*
*/
def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY, position, first)
def maxBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MAXBY,
position, first)
/**
* Applies an aggregation that that gives the minimum element of the window by
* the given position. When equality, returns the first.
*
*/
def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY, position, first)
def minBy(position: Int, first: Boolean = true): DataStream[T] = aggregate(AggregationType.MINBY,
position, first)
def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true): DataStream[T] = {
def aggregate(aggregationType: AggregationType, position: Int, first: Boolean = true):
DataStream[T] = {
val jStream = javaStream.asInstanceOf[JavaWStream[Product]]
val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
......@@ -198,11 +206,12 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
val agg = new ScalaStreamingAggregator[Product](jStream.getType().createSerializer(), position)
val reducer = aggregationType match {
case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(outType.getTypeAt(position).getTypeClass()));
case AggregationType.SUM => new agg.Sum(SumFunction.getForClass(
outType.getTypeAt(position).getTypeClass()));
case _ => new agg.ProductComparableAggregator(aggregationType, first)
}
new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
}
}
\ No newline at end of file
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册