提交 086acf68 编写于 作者: M Márton Balassi

[streaming] [scala] Exposed environment from DataStream

This is needed for streaming library features, is identical to the batch API.

Closes #1480
上级 85ac6d3d
......@@ -51,7 +51,7 @@ class ScalaStreamingTranslator extends PlanTranslator {
resultFields: Seq[(String, TypeInformation[_])]): Table = {
val result =
javaTranslator.createTable(repr.getJavaStream, inputType, expressions, resultFields)
javaTranslator.createTable(repr.javaStream, inputType, expressions, resultFields)
new Table(result.operation)
}
......
......@@ -90,7 +90,7 @@ package object table extends ImplicitExpressionConversions {
stream: DataStream[T]): DataStreamConversions[T] = {
new DataStreamConversions[T](
stream,
stream.getJavaStream.getType.asInstanceOf[CompositeType[T]])
stream.javaStream.getType.asInstanceOf[CompositeType[T]])
}
implicit def table2RowDataStream(
......
......@@ -97,7 +97,7 @@ object CoGroupedStreams {
* is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
*/
private[flink] def clean[F <: AnyRef](f: F): F = {
new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
}
}
......@@ -170,7 +170,7 @@ object CoGroupedStreams {
* is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
*/
private[flink] def clean[F <: AnyRef](f: F): F = {
new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
}
}
......@@ -270,7 +270,7 @@ object CoGroupedStreams {
*/
def apply[T: TypeInformation](function: CoGroupFunction[T1, T2, T]): DataStream[T] = {
val coGroup = new JavaCoGroupedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream)
val coGroup = new JavaCoGroupedStreams[T1, T2](input1.javaStream, input2.javaStream)
coGroup
.where(keySelector1)
......@@ -286,7 +286,7 @@ object CoGroupedStreams {
* is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
*/
private[flink] def clean[F <: AnyRef](f: F): F = {
new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
}
}
......
......@@ -39,33 +39,40 @@ import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
class DataStream[T](javaStream: JavaStream[T]) {
class DataStream[T](stream: JavaStream[T]) {
/**
* Gets the underlying java DataStream object.
*/
def getJavaStream: JavaStream[T] = javaStream
private[flink] def javaStream: JavaStream[T] = stream
/**
* Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]].
* @return associated execution environment
*/
def getExecutionEnvironment: StreamExecutionEnvironment =
new StreamExecutionEnvironment(stream.getExecutionEnvironment)
/**
* Returns the ID of the DataStream.
*
* @return ID of the DataStream
*/
def getId = javaStream.getId
def getId = stream.getId
/**
* Returns the TypeInformation for the elements of this DataStream.
*/
def getType(): TypeInformation[T] = javaStream.getType()
def getType(): TypeInformation[T] = stream.getType()
/**
* Sets the parallelism of this operation. This must be at least 1.
*/
def setParallelism(parallelism: Int): DataStream[T] = {
javaStream match {
stream match {
case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(parallelism)
case _ =>
throw new UnsupportedOperationException("Operator " + javaStream.toString + " cannot " +
throw new UnsupportedOperationException("Operator " + stream.toString + " cannot " +
"have " +
"parallelism.")
}
......@@ -75,12 +82,12 @@ class DataStream[T](javaStream: JavaStream[T]) {
/**
* Returns the parallelism of this operation.
*/
def getParallelism = javaStream.getParallelism
def getParallelism = stream.getParallelism
/**
* Returns the execution config.
*/
def getExecutionConfig = javaStream.getExecutionConfig
def getExecutionConfig = stream.getExecutionConfig
/**
* Gets the name of the current data stream. This name is
......@@ -88,7 +95,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
* @return Name of the stream.
*/
def getName : String = javaStream match {
def getName : String = stream match {
case stream : SingleOutputStreamOperator[T,_] => stream.getName
case _ => throw new
UnsupportedOperationException("Only supported for operators.")
......@@ -100,7 +107,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
* @return The named operator
*/
def name(name: String) : DataStream[T] = javaStream match {
def name(name: String) : DataStream[T] = stream match {
case stream : SingleOutputStreamOperator[T,_] => stream.name(name)
case _ => throw new UnsupportedOperationException("Only supported for operators.")
this
......@@ -132,7 +139,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def disableChaining(): DataStream[T] = {
javaStream match {
stream match {
case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining();
case _ =>
throw new UnsupportedOperationException("Only supported for operators.")
......@@ -147,7 +154,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def startNewChain(): DataStream[T] = {
javaStream match {
stream match {
case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain();
case _ =>
throw new UnsupportedOperationException("Only supported for operators.")
......@@ -163,7 +170,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def isolateResources(): DataStream[T] = {
javaStream match {
stream match {
case ds: SingleOutputStreamOperator[_, _] => ds.isolateResources();
case _ =>
throw new UnsupportedOperationException("Only supported for operators.")
......@@ -183,7 +190,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* default.
*/
def startNewResourceGroup(): DataStream[T] = {
javaStream match {
stream match {
case ds: SingleOutputStreamOperator[_, _] => ds.startNewResourceGroup();
case _ =>
throw new UnsupportedOperationException("Only supported for operators.")
......@@ -200,7 +207,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* @return The operator with buffer timeout set.
*/
def setBufferTimeout(timeoutMillis: Long): DataStream[T] = {
javaStream match {
stream match {
case ds: SingleOutputStreamOperator[_, _] => ds.setBufferTimeout(timeoutMillis);
case _ =>
throw new UnsupportedOperationException("Only supported for operators.")
......@@ -215,7 +222,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def union(dataStreams: DataStream[T]*): DataStream[T] =
javaStream.union(dataStreams.map(_.getJavaStream): _*)
stream.union(dataStreams.map(_.javaStream): _*)
/**
* Creates a new ConnectedStreams by connecting
......@@ -223,21 +230,21 @@ class DataStream[T](javaStream: JavaStream[T]) {
* DataStreams connected using this operators can be used with CoFunctions.
*/
def connect[T2](dataStream: DataStream[T2]): ConnectedStreams[T, T2] =
javaStream.connect(dataStream.getJavaStream)
stream.connect(dataStream.javaStream)
/**
* Groups the elements of a DataStream by the given key positions (for tuple/array types) to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = javaStream.keyBy(fields: _*)
def keyBy(fields: Int*): KeyedStream[T, JavaTuple] = stream.keyBy(fields: _*)
/**
* Groups the elements of a DataStream by the given field expressions to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def keyBy(firstField: String, otherFields: String*): KeyedStream[T, JavaTuple] =
javaStream.keyBy(firstField +: otherFields.toArray: _*)
stream.keyBy(firstField +: otherFields.toArray: _*)
/**
* Groups the elements of a DataStream by the given K key to
* be used with grouped operators like grouped reduce or grouped aggregations.
......@@ -246,26 +253,26 @@ class DataStream[T](javaStream: JavaStream[T]) {
val cleanFun = clean(fun)
val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]
val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
override def getProducedType: TypeInformation[K] = keyType
}
new JavaKeyedStream(javaStream, keyExtractor, keyType)
new JavaKeyedStream(stream, keyExtractor, keyType)
}
/**
* Partitions the elements of a DataStream by the given key positions (for tuple/array types) to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def partitionByHash(fields: Int*): DataStream[T] = javaStream.partitionByHash(fields: _*)
def partitionByHash(fields: Int*): DataStream[T] = stream.partitionByHash(fields: _*)
/**
* Groups the elements of a DataStream by the given field expressions to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def partitionByHash(firstField: String, otherFields: String*): DataStream[T] =
javaStream.partitionByHash(firstField +: otherFields.toArray: _*)
stream.partitionByHash(firstField +: otherFields.toArray: _*)
/**
* Groups the elements of a DataStream by the given K key to
......@@ -278,7 +285,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
def getKey(in: T) = cleanFun(in)
override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
}
javaStream.partitionByHash(keyExtractor)
stream.partitionByHash(keyExtractor)
}
/**
......@@ -289,7 +296,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* Note: This method works only on single field keys.
*/
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] =
javaStream.partitionCustom(partitioner, field)
stream.partitionCustom(partitioner, field)
/**
* Partitions a POJO DataStream on the specified key fields using a custom partitioner.
......@@ -299,7 +306,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* Note: This method works only on single field keys.
*/
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String)
: DataStream[T] = javaStream.partitionCustom(partitioner, field)
: DataStream[T] = stream.partitionCustom(partitioner, field)
/**
* Partitions a DataStream on the key returned by the selector, using a custom partitioner.
......@@ -316,7 +323,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
def getKey(in: T) = cleanFun(in)
override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
}
javaStream.partitionCustom(partitioner, keyExtractor)
stream.partitionCustom(partitioner, keyExtractor)
}
/**
......@@ -326,14 +333,14 @@ class DataStream[T](javaStream: JavaStream[T]) {
* parallel instances of the next processing operator.
*
*/
def broadcast: DataStream[T] = javaStream.broadcast()
def broadcast: DataStream[T] = stream.broadcast()
/**
* Sets the partitioning of the DataStream so that the output values all go to
* Sets the partitioning of the DataStream so that the output values all go to
* the first instance of the next processing operator. Use this setting with care
* since it might cause a serious performance bottleneck in the application.
*/
def global: DataStream[T] = javaStream.global()
def global: DataStream[T] = stream.global()
/**
* Sets the partitioning of the DataStream so that the output tuples
......@@ -342,7 +349,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* processing operator.
*
*/
def shuffle: DataStream[T] = javaStream.shuffle()
def shuffle: DataStream[T] = stream.shuffle()
/**
* Sets the partitioning of the DataStream so that the output tuples
......@@ -352,7 +359,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* instances of the next processing operator.
*
*/
def forward: DataStream[T] = javaStream.forward()
def forward: DataStream[T] = stream.forward()
/**
* Sets the partitioning of the DataStream so that the output tuples
......@@ -361,7 +368,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* the next processing operator.
*
*/
def rebalance: DataStream[T] = javaStream.rebalance()
def rebalance: DataStream[T] = stream.rebalance()
/**
* Initiates an iterative part of the program that creates a loop by feeding
......@@ -379,17 +386,17 @@ class DataStream[T](javaStream: JavaStream[T]) {
* can use the maxWaitTime parameter to set a max waiting time for the iteration head.
* If no data received in the set time the stream terminates.
* <p>
* By default the feedback partitioning is set to match the input, to override this set
* By default the feedback partitioning is set to match the input, to override this set
* the keepPartitioning flag to true
*
*/
def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),
maxWaitTimeMillis:Long = 0,
keepPartitioning: Boolean = false) : DataStream[R] = {
val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
val iterativeStream = stream.iterate(maxWaitTimeMillis)
val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
iterativeStream.closeWith(feedback.getJavaStream)
iterativeStream.closeWith(feedback.javaStream)
output
}
......@@ -416,11 +423,11 @@ class DataStream[T](javaStream: JavaStream[T]) {
def iterate[R, F: TypeInformation: ClassTag](stepFunction: ConnectedStreams[T, F] =>
(DataStream[F], DataStream[R]), maxWaitTimeMillis:Long): DataStream[R] = {
val feedbackType: TypeInformation[F] = implicitly[TypeInformation[F]]
val connectedIterativeStream = javaStream.iterate(maxWaitTimeMillis).
val connectedIterativeStream = stream.iterate(maxWaitTimeMillis).
withFeedbackType(feedbackType)
val (feedback, output) = stepFunction(connectedIterativeStream)
connectedIterativeStream.closeWith(feedback.getJavaStream)
connectedIterativeStream.closeWith(feedback.javaStream)
output
}
......@@ -448,9 +455,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
javaStream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]]
stream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]]
}
/**
* Creates a new DataStream by applying the given function to every element and flattening
* the results.
......@@ -461,7 +468,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
javaStream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]]
stream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]]
}
/**
......@@ -501,7 +508,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
if (filter == null) {
throw new NullPointerException("Filter function must not be null.")
}
javaStream.filter(filter)
stream.filter(filter)
}
/**
......@@ -567,7 +574,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* @param slide The slide interval in number of elements.
*/
def countWindowAll(size: Long, slide: Long): AllWindowedStream[T, GlobalWindow] = {
new AllWindowedStream(javaStream.countWindowAll(size, slide))
new AllWindowedStream(stream.countWindowAll(size, slide))
}
/**
......@@ -580,7 +587,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* @param size The size of the windows in number of elements.
*/
def countWindowAll(size: Long): AllWindowedStream[T, GlobalWindow] = {
new AllWindowedStream(javaStream.countWindowAll(size))
new AllWindowedStream(stream.countWindowAll(size))
}
/**
......@@ -600,7 +607,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* @return The trigger windows data stream.
*/
def windowAll[W <: Window](assigner: WindowAssigner[_ >: T, W]): AllWindowedStream[T, W] = {
new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](javaStream, assigner))
new AllWindowedStream[T, W](new JavaAllWindowedStream[T, W](stream, assigner))
}
/**
* Extracts a timestamp from an element and assigns it as the internal timestamp of that element.
......@@ -614,7 +621,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* @see org.apache.flink.streaming.api.watermark.Watermark
*/
def assignTimestamps(extractor: TimestampExtractor[T]): DataStream[T] = {
javaStream.assignTimestamps(clean(extractor))
stream.assignTimestamps(clean(extractor))
}
/**
......@@ -635,7 +642,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
cleanExtractor(element)
}
}
javaStream.assignTimestamps(extractorFunction)
stream.assignTimestamps(extractorFunction)
}
/**
......@@ -644,7 +651,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* OutputSelector. Calling this method on an operator creates a new
* [[SplitStream]].
*/
def split(selector: OutputSelector[T]): SplitStream[T] = javaStream.split(selector)
def split(selector: OutputSelector[T]): SplitStream[T] = stream.split(selector)
/**
* Creates a new [[SplitStream]] that contains only the elements satisfying the
......@@ -685,7 +692,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* written.
*
*/
def print(): DataStreamSink[T] = javaStream.print()
def print(): DataStreamSink[T] = stream.print()
/**
* Writes a DataStream to the standard output stream (stderr).
......@@ -695,7 +702,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
* @return The closed DataStream.
*/
def printToErr() = javaStream.printToErr()
def printToErr() = stream.printToErr()
/**
* Writes a DataStream to the file specified by path in text format. For
......@@ -705,7 +712,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* @return The closed DataStream
*/
def writeAsText(path: String): DataStreamSink[T] =
javaStream.writeAsText(path, 0L)
stream.writeAsText(path, 0L)
/**
* Writes a DataStream to the file specified by path in text format. The
......@@ -718,7 +725,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* @return The closed DataStream
*/
def writeAsText(path: String, millis: Long): DataStreamSink[T] =
javaStream.writeAsText(path, millis)
stream.writeAsText(path, millis)
/**
* Writes a DataStream to the file specified by path in text format. For
......@@ -731,9 +738,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
*/
def writeAsText(path: String, writeMode: FileSystem.WriteMode): DataStreamSink[T] = {
if (writeMode != null) {
javaStream.writeAsText(path, writeMode)
stream.writeAsText(path, writeMode)
} else {
javaStream.writeAsText(path)
stream.writeAsText(path)
}
}
......@@ -754,9 +761,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
millis: Long)
: DataStreamSink[T] = {
if (writeMode != null) {
javaStream.writeAsText(path, writeMode, millis)
stream.writeAsText(path, writeMode, millis)
} else {
javaStream.writeAsText(path, millis)
stream.writeAsText(path, millis)
}
}
......@@ -846,12 +853,12 @@ class DataStream[T](javaStream: JavaStream[T]) {
rowDelimiter: String,
fieldDelimiter: String)
: DataStreamSink[T] = {
require(javaStream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
require(stream.getType.isTupleType, "CSV output can only be used with Tuple DataSets.")
val of = new ScalaCsvOutputFormat[Product](new Path(path), rowDelimiter, fieldDelimiter)
if (writeMode != null) {
of.setWriteMode(writeMode)
}
javaStream.write(of.asInstanceOf[OutputFormat[T]], millis)
stream.write(of.asInstanceOf[OutputFormat[T]], millis)
}
/**
......@@ -859,7 +866,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* writing is performed periodically, in every millis milliseconds.
*/
def write(format: OutputFormat[T], millis: Long): DataStreamSink[T] = {
javaStream.write(format, millis)
stream.write(format, millis)
}
/**
......@@ -870,7 +877,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
hostname: String,
port: Integer,
schema: SerializationSchema[T]): DataStreamSink[T] = {
javaStream.writeToSocket(hostname, port, schema)
stream.writeToSocket(hostname, port, schema)
}
/**
......@@ -880,7 +887,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def addSink(sinkFunction: SinkFunction[T]): DataStreamSink[T] =
javaStream.addSink(sinkFunction)
stream.addSink(sinkFunction)
/**
* Adds the given sink to this DataStream. Only streams with sinks added
......@@ -904,7 +911,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
*/
private[flink] def clean[F <: AnyRef](f: F): F = {
new StreamExecutionEnvironment(javaStream.getExecutionEnvironment).scalaClean(f)
new StreamExecutionEnvironment(stream.getExecutionEnvironment).scalaClean(f)
}
}
......@@ -95,7 +95,7 @@ object JoinedStreams {
* is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
*/
private[flink] def clean[F <: AnyRef](f: F): F = {
new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
}
}
......@@ -168,7 +168,7 @@ object JoinedStreams {
* is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
*/
private[flink] def clean[F <: AnyRef](f: F): F = {
new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
}
}
......@@ -263,7 +263,7 @@ object JoinedStreams {
*/
def apply[T: TypeInformation](function: JoinFunction[T1, T2, T]): DataStream[T] = {
val join = new JavaJoinedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream)
val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream)
join
.where(keySelector1)
......@@ -280,7 +280,7 @@ object JoinedStreams {
*/
def apply[T: TypeInformation](function: FlatJoinFunction[T1, T2, T]): DataStream[T] = {
val join = new JavaJoinedStreams[T1, T2](input1.getJavaStream, input2.getJavaStream)
val join = new JavaJoinedStreams[T1, T2](input1.javaStream, input2.javaStream)
join
.where(keySelector1)
......@@ -296,7 +296,7 @@ object JoinedStreams {
* is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]].
*/
private[flink] def clean[F <: AnyRef](f: F): F = {
new StreamExecutionEnvironment(input1.getJavaStream.getExecutionEnvironment).scalaClean(f)
new StreamExecutionEnvironment(input1.javaStream.getExecutionEnvironment).scalaClean(f)
}
}
......
......@@ -63,7 +63,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(reducer)
val transform1 = window1.getJavaStream.getTransformation
val transform1 = window1.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator1 = transform1.getOperator
......@@ -82,7 +82,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
out: Collector[(String, Int)]) { }
})
val transform2 = window2.getJavaStream.getTransformation
val transform2 = window2.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator2 = transform2.getOperator
......@@ -105,7 +105,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
.trigger(CountTrigger.of(100))
.reduce(reducer)
val transform1 = window1.getJavaStream.getTransformation
val transform1 = window1.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator1 = transform1.getOperator
......@@ -128,7 +128,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
out: Collector[(String, Int)]) { }
})
val transform2 = window2.getJavaStream.getTransformation
val transform2 = window2.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator2 = transform2.getOperator
......@@ -155,7 +155,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
.evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
.reduce(reducer)
val transform1 = window1.getJavaStream.getTransformation
val transform1 = window1.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator1 = transform1.getOperator
......@@ -179,7 +179,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
out: Collector[(String, Int)]) { }
})
val transform2 = window2.getJavaStream.getTransformation
val transform2 = window2.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator2 = transform2.getOperator
......@@ -214,7 +214,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
out: Collector[(String, Int)]) { }
})
val transform1 = window1.getJavaStream.getTransformation
val transform1 = window1.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator1 = transform1.getOperator
......@@ -239,7 +239,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
out: Collector[(String, Int)]) { }
})
val transform2 = window2.getJavaStream.getTransformation
val transform2 = window2.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator2 = transform2.getOperator
......
......@@ -516,7 +516,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
private def getOperatorForDataStream(dataStream: DataStream[_]): StreamOperator[_] = {
dataStream.print()
val env = dataStream.getJavaStream.getExecutionEnvironment
val env = dataStream.javaStream.getExecutionEnvironment
val streamGraph: StreamGraph = env.getStreamGraph
streamGraph.getStreamNode(dataStream.getId).getOperator
}
......
......@@ -98,12 +98,12 @@ class StreamingOperatorsITCase extends ScalaStreamingMultipleProgramsTestBase {
splittedResult
.select("0")
.map(_._2)
.getJavaStream
.javaStream
.writeAsText(resultPath1, FileSystem.WriteMode.OVERWRITE)
splittedResult
.select("1")
.map(_._2)
.getJavaStream
.javaStream
.writeAsText(resultPath2, FileSystem.WriteMode.OVERWRITE)
val groupedSequence = 0 until numElements groupBy( _ % numKeys)
......
......@@ -37,7 +37,6 @@ class StreamingScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
val excludedNames = Seq(
// These are only used internally. Should be internal API but Java doesn't have
// private[flink].
"org.apache.flink.streaming.api.datastream.DataStream.getExecutionEnvironment",
"org.apache.flink.streaming.api.datastream.DataStream.getType",
"org.apache.flink.streaming.api.datastream.DataStream.copy",
"org.apache.flink.streaming.api.datastream.DataStream.transform",
......
......@@ -57,7 +57,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(reducer)
val transform1 = window1.getJavaStream.getTransformation
val transform1 = window1.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator1 = transform1.getOperator
......@@ -77,7 +77,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
out: Collector[(String, Int)]) { }
})
val transform2 = window2.getJavaStream.getTransformation
val transform2 = window2.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator2 = transform2.getOperator
......@@ -101,7 +101,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
.trigger(CountTrigger.of(100))
.reduce(reducer)
val transform1 = window1.getJavaStream.getTransformation
val transform1 = window1.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator1 = transform1.getOperator
......@@ -126,7 +126,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
out: Collector[(String, Int)]) { }
})
val transform2 = window2.getJavaStream.getTransformation
val transform2 = window2.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator2 = transform2.getOperator
......@@ -154,7 +154,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
.evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
.reduce(reducer)
val transform1 = window1.getJavaStream.getTransformation
val transform1 = window1.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator1 = transform1.getOperator
......@@ -180,7 +180,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
out: Collector[(String, Int)]) { }
})
val transform2 = window2.getJavaStream.getTransformation
val transform2 = window2.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator2 = transform2.getOperator
......@@ -215,7 +215,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
out: Collector[(String, Int)]) { }
})
val transform1 = window1.getJavaStream.getTransformation
val transform1 = window1.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator1 = transform1.getOperator
......@@ -240,7 +240,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
out: Collector[(String, Int)]) { }
})
val transform2 = window2.getJavaStream.getTransformation
val transform2 = window2.javaStream.getTransformation
.asInstanceOf[OneInputTransformation[(String, Int), (String, Int)]]
val operator2 = transform2.getOperator
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册