提交 a8afec3b 编写于 作者: S Stephan Ewen

[FLINK-3421] [streaming scala] Remove unneeded ClassTag context bounds

上级 6dda5316
......@@ -30,8 +30,6 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
import scala.reflect.ClassTag
import scala.collection.JavaConverters._
/**
......@@ -136,7 +134,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
*/
def fold[R: TypeInformation: ClassTag](
def fold[R: TypeInformation](
initialValue: R,
function: FoldFunction[T,R]): DataStream[R] = {
......@@ -156,7 +154,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
*/
def fold[R: TypeInformation: ClassTag](initialValue: R, function: (R, T) => R): DataStream[R] = {
def fold[R: TypeInformation](initialValue: R, function: (R, T) => R): DataStream[R] = {
if (function == null) {
throw new NullPointerException("Fold function must not be null.")
}
......@@ -180,7 +178,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
def apply[R: TypeInformation: ClassTag](
def apply[R: TypeInformation](
function: AllWindowFunction[Iterable[T], R, W]): DataStream[R] = {
val cleanedFunction = clean(function)
......@@ -203,7 +201,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
def apply[R: TypeInformation: ClassTag](
def apply[R: TypeInformation](
function: (W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
val cleanedFunction = clean(function)
......@@ -226,7 +224,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
def apply[R: TypeInformation: ClassTag](
def apply[R: TypeInformation](
preAggregator: ReduceFunction[T],
function: AllWindowFunction[T, R, W]): DataStream[R] = {
......@@ -245,7 +243,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
def apply[R: TypeInformation: ClassTag](
def apply[R: TypeInformation](
preAggregator: (T, T) => T,
function: (W, T, Collector[R]) => Unit): DataStream[R] = {
if (function == null) {
......@@ -283,7 +281,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
def apply[R: TypeInformation: ClassTag](
def apply[R: TypeInformation](
initialValue: R,
preAggregator: FoldFunction[T, R],
function: AllWindowFunction[R, R, W]): DataStream[R] = {
......@@ -307,7 +305,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
def apply[R: TypeInformation: ClassTag](
def apply[R: TypeInformation](
initialValue: R,
preAggregator: (R, T) => R,
function: (W, R, Collector[R]) => Unit): DataStream[R] = {
......
......@@ -32,8 +32,6 @@ import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
/**
* `CoGroupedStreams` represents two [[DataStream]]s that have been co-grouped.
* A streaming co-group operation is evaluated over elements in a window.
......@@ -236,7 +234,7 @@ object CoGroupedStreams {
* Completes the co-group operation with the user function that is executed
* for windowed groups.
*/
def apply[O: TypeInformation: ClassTag](
def apply[O: TypeInformation](
fun: (Iterator[T1], Iterator[T2]) => O): DataStream[O] = {
require(fun != null, "CoGroup function must not be null.")
......@@ -255,7 +253,7 @@ object CoGroupedStreams {
* Completes the co-group operation with the user function that is executed
* for windowed groups.
*/
def apply[O: TypeInformation: ClassTag](
def apply[O: TypeInformation](
fun: (Iterator[T1], Iterator[T2], Collector[O]) => Unit): DataStream[O] = {
require(fun != null, "CoGroup function must not be null.")
......
......@@ -26,8 +26,6 @@ import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStrea
import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
import org.apache.flink.util.Collector
import scala.reflect.ClassTag
/**
* [[ConnectedStreams]] represents two connected streams of (possibly) different data types.
* Connected streams are useful for cases where operations on one stream directly
......@@ -60,7 +58,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* @param fun2 Function called per element of the second input.
* @return The resulting data stream.
*/
def map[R: TypeInformation: ClassTag](fun1: IN1 => R, fun2: IN2 => R):
def map[R: TypeInformation](fun1: IN1 => R, fun2: IN2 => R):
DataStream[R] = {
if (fun1 == null || fun2 == null) {
......@@ -92,7 +90,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* @return
* The resulting data stream
*/
def map[R: TypeInformation: ClassTag](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R] = {
def map[R: TypeInformation](coMapper: CoMapFunction[IN1, IN2, R]): DataStream[R] = {
if (coMapper == null) {
throw new NullPointerException("Map function must not be null.")
}
......@@ -117,7 +115,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* @return
* The resulting data stream.
*/
def flatMap[R: TypeInformation: ClassTag](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]):
def flatMap[R: TypeInformation](coFlatMapper: CoFlatMapFunction[IN1, IN2, R]):
DataStream[R] = {
if (coFlatMapper == null) {
......@@ -139,7 +137,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* @param fun2 Function called per element of the second input.
* @return The resulting data stream.
*/
def flatMap[R: TypeInformation: ClassTag](
def flatMap[R: TypeInformation](
fun1: (IN1, Collector[R]) => Unit,
fun2: (IN2, Collector[R]) => Unit): DataStream[R] = {
......@@ -166,7 +164,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* @param fun2 Function called per element of the second input.
* @return The resulting data stream.
*/
def flatMap[R: TypeInformation: ClassTag](
def flatMap[R: TypeInformation](
fun1: IN1 => TraversableOnce[R],
fun2: IN2 => TraversableOnce[R]): DataStream[R] = {
......
......@@ -38,7 +38,6 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
@Public
class DataStream[T](stream: JavaStream[T]) {
......@@ -447,7 +446,7 @@ class DataStream[T](stream: JavaStream[T]) {
*
*/
@PublicEvolving
def iterate[R, F: TypeInformation: ClassTag](
def iterate[R, F: TypeInformation](
stepFunction: ConnectedStreams[T, F] => (DataStream[F], DataStream[R]),
maxWaitTimeMillis:Long): DataStream[R] = {
......@@ -464,7 +463,7 @@ class DataStream[T](stream: JavaStream[T]) {
/**
* Creates a new DataStream by applying the given function to every element of this DataStream.
*/
def map[R: TypeInformation: ClassTag](fun: T => R): DataStream[R] = {
def map[R: TypeInformation](fun: T => R): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("Map function must not be null.")
}
......@@ -479,7 +478,7 @@ class DataStream[T](stream: JavaStream[T]) {
/**
* Creates a new DataStream by applying the given function to every element of this DataStream.
*/
def map[R: TypeInformation: ClassTag](mapper: MapFunction[T, R]): DataStream[R] = {
def map[R: TypeInformation](mapper: MapFunction[T, R]): DataStream[R] = {
if (mapper == null) {
throw new NullPointerException("Map function must not be null.")
}
......@@ -492,7 +491,7 @@ class DataStream[T](stream: JavaStream[T]) {
* Creates a new DataStream by applying the given function to every element and flattening
* the results.
*/
def flatMap[R: TypeInformation: ClassTag](flatMapper: FlatMapFunction[T, R]): DataStream[R] = {
def flatMap[R: TypeInformation](flatMapper: FlatMapFunction[T, R]): DataStream[R] = {
if (flatMapper == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
......@@ -505,7 +504,7 @@ class DataStream[T](stream: JavaStream[T]) {
* Creates a new DataStream by applying the given function to every element and flattening
* the results.
*/
def flatMap[R: TypeInformation: ClassTag](fun: (T, Collector[R]) => Unit): DataStream[R] = {
def flatMap[R: TypeInformation](fun: (T, Collector[R]) => Unit): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
......@@ -520,7 +519,7 @@ class DataStream[T](stream: JavaStream[T]) {
* Creates a new DataStream by applying the given function to every element and flattening
* the results.
*/
def flatMap[R: TypeInformation: ClassTag](fun: T => TraversableOnce[R]): DataStream[R] = {
def flatMap[R: TypeInformation](fun: T => TraversableOnce[R]): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("FlatMap function must not be null.")
}
......
......@@ -30,8 +30,6 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
import scala.reflect.ClassTag
/**
* `JoinedStreams` represents two [[DataStream]]s that have been joined.
* A streaming join operation is evaluated over elements in a window.
......@@ -233,7 +231,7 @@ object JoinedStreams {
* Completes the join operation with the user function that is executed
* for windowed groups.
*/
def apply[O: TypeInformation: ClassTag](fun: (T1, T2) => O): DataStream[O] = {
def apply[O: TypeInformation](fun: (T1, T2) => O): DataStream[O] = {
require(fun != null, "Join function must not be null.")
val joiner = new FlatJoinFunction[T1, T2, O] {
......@@ -249,7 +247,7 @@ object JoinedStreams {
* Completes the join operation with the user function that is executed
* for windowed groups.
*/
def apply[O: TypeInformation: ClassTag](fun: (T1, T2, Collector[O]) => Unit): DataStream[O] = {
def apply[O: TypeInformation](fun: (T1, T2, Collector[O]) => Unit): DataStream[O] = {
require(fun != null, "Join function must not be null.")
val joiner = new FlatJoinFunction[T1, T2, O] {
......
......@@ -32,8 +32,6 @@ import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
import org.apache.flink.util.Collector
import scala.reflect.ClassTag
@Public
class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T](javaStream) {
......@@ -151,7 +149,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* using an associative fold function and an initial value. An independent
* aggregate is kept per key.
*/
def fold[R: TypeInformation: ClassTag](initialValue: R, folder: FoldFunction[T,R]):
def fold[R: TypeInformation](initialValue: R, folder: FoldFunction[T,R]):
DataStream[R] = {
if (folder == null) {
throw new NullPointerException("Fold function must not be null.")
......@@ -168,7 +166,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* using an associative fold function and an initial value. An independent
* aggregate is kept per key.
*/
def fold[R: TypeInformation: ClassTag](initialValue: R, fun: (R,T) => R): DataStream[R] = {
def fold[R: TypeInformation](initialValue: R, fun: (R,T) => R): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("Fold function must not be null.")
}
......@@ -321,7 +319,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
*
* Note that the user state object needs to be serializable.
*/
def mapWithState[R: TypeInformation: ClassTag, S: TypeInformation](
def mapWithState[R: TypeInformation, S: TypeInformation](
fun: (T, Option[S]) => (R, Option[S])): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("Map function must not be null.")
......@@ -350,7 +348,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
*
* Note that the user state object needs to be serializable.
*/
def flatMapWithState[R: TypeInformation: ClassTag, S: TypeInformation](
def flatMapWithState[R: TypeInformation, S: TypeInformation](
fun: (T, Option[S]) => (TraversableOnce[R], Option[S])): DataStream[R] = {
if (fun == null) {
throw new NullPointerException("Flatmap function must not be null.")
......
......@@ -35,7 +35,6 @@ import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.util.SplittableIterator
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import _root_.scala.language.implicitConversions
......@@ -387,9 +386,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a parallelism of one.
*/
def fromElements[T: ClassTag: TypeInformation](data: T*): DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
fromCollection(data)(implicitly[ClassTag[T]], typeInfo)
def fromElements[T: TypeInformation](data: T*): DataStream[T] = {
fromCollection(data)
}
/**
......@@ -399,7 +397,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a parallelism of one.
*/
def fromCollection[T: ClassTag: TypeInformation](data: Seq[T]): DataStream[T] = {
def fromCollection[T: TypeInformation](data: Seq[T]): DataStream[T] = {
require(data != null, "Data must not be null.")
val typeInfo = implicitly[TypeInformation[T]]
......@@ -413,7 +411,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Note that this operation will result in a non-parallel data source, i.e. a data source with
* a parallelism of one.
*/
def fromCollection[T: ClassTag : TypeInformation] (data: Iterator[T]): DataStream[T] = {
def fromCollection[T: TypeInformation] (data: Iterator[T]): DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.fromCollection(data.asJava, typeInfo))
}
......@@ -421,7 +419,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
/**
* Creates a DataStream from the given [[SplittableIterator]].
*/
def fromParallelCollection[T: ClassTag : TypeInformation] (data: SplittableIterator[T]):
def fromParallelCollection[T: TypeInformation] (data: SplittableIterator[T]):
DataStream[T] = {
val typeInfo = implicitly[TypeInformation[T]]
asScalaStream(javaEnv.fromParallelCollection(data, typeInfo))
......@@ -447,7 +445,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Reads the given file with the given input format. The file path should be passed
* as a URI (e.g., "file:///some/local/file" or "hdfs://host:port/file/path").
*/
def readFile[T: ClassTag : TypeInformation](inputFormat: FileInputFormat[T], filePath: String):
def readFile[T: TypeInformation](inputFormat: FileInputFormat[T], filePath: String):
DataStream[T] =
asScalaStream(javaEnv.readFile(inputFormat, filePath))
......@@ -482,7 +480,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* data type by reflection, unless the input format implements the ResultTypeQueryable interface.
*/
@PublicEvolving
def createInput[T: ClassTag : TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] =
def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T] =
asScalaStream(javaEnv.createInput(inputFormat))
/**
......@@ -494,7 +492,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* To change this afterwards call DataStreamSource.setParallelism(int)
*
*/
def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
def addSource[T: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
require(function != null, "Function must not be null.")
val cleanFun = scalaClean(function)
......@@ -506,7 +504,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Create a DataStream using a user defined source function for arbitrary
* source functionality.
*/
def addSource[T: ClassTag: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = {
def addSource[T: TypeInformation](function: SourceContext[T] => Unit): DataStream[T] = {
require(function != null, "Function must not be null.")
val sourceFunction = new SourceFunction[T] {
val cleanFun = scalaClean(function)
......@@ -522,10 +520,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
* <p>
*
* The program execution will be logged and displayed with a generated
* default name.
*
*/
def execute() = javaEnv.execute()
......@@ -533,9 +530,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Triggers the program execution. The environment will execute all parts of
* the program that have resulted in a "sink" operation. Sink operations are
* for example printing results or forwarding them to a message queue.
* <p>
* The program execution will be logged and displayed with the provided name
*
*
* The program execution will be logged and displayed with the provided name.
*/
def execute(jobName: String) = javaEnv.execute(jobName)
......@@ -544,7 +540,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* returns it as a String using a JSON representation of the execution data
* flow graph. Note that this needs to be called, before the plan is
* executed.
*
*/
def getExecutionPlan = javaEnv.getExecutionPlan
......
......@@ -30,8 +30,6 @@ import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
import scala.reflect.ClassTag
import scala.collection.JavaConverters._
/**
......@@ -139,7 +137,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
*/
def fold[R: TypeInformation: ClassTag](
def fold[R: TypeInformation](
initialValue: R,
function: FoldFunction[T,R]): DataStream[R] = {
if (function == null) {
......@@ -159,7 +157,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
*/
def fold[R: TypeInformation: ClassTag](initialValue: R, function: (R, T) => R): DataStream[R] = {
def fold[R: TypeInformation](initialValue: R, function: (R, T) => R): DataStream[R] = {
if (function == null) {
throw new NullPointerException("Fold function must not be null.")
}
......@@ -183,7 +181,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
def apply[R: TypeInformation: ClassTag](
def apply[R: TypeInformation](
function: WindowFunction[Iterable[T], R, K, W]): DataStream[R] = {
val cleanFunction = clean(function)
......@@ -207,7 +205,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
def apply[R: TypeInformation: ClassTag](
def apply[R: TypeInformation](
function: (K, W, Iterable[T], Collector[R]) => Unit): DataStream[R] = {
if (function == null) {
throw new NullPointerException("WindowApply function must not be null.")
......@@ -233,7 +231,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
def apply[R: TypeInformation: ClassTag](
def apply[R: TypeInformation](
preAggregator: ReduceFunction[T],
function: WindowFunction[T, R, K, W]): DataStream[R] = {
......@@ -252,7 +250,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
def apply[R: TypeInformation: ClassTag](
def apply[R: TypeInformation](
preAggregator: (T, T) => T,
function: (K, W, T, Collector[R]) => Unit): DataStream[R] = {
......@@ -290,7 +288,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
def apply[R: TypeInformation: ClassTag](
def apply[R: TypeInformation](
initialValue: R,
foldFunction: FoldFunction[T, R],
function: WindowFunction[R, R, K, W]): DataStream[R] = {
......@@ -313,7 +311,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
def apply[R: TypeInformation: ClassTag](
def apply[R: TypeInformation](
initialValue: R,
foldFunction: (R, T) => R,
function: (K, W, R, Collector[R]) => Unit): DataStream[R] = {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册