提交 8cce136b 编写于 作者: S Stephan Ewen

[FLINK-3413] [streaming] Remove implicit Seq -> DataStream conversion

Because the implicit conversion creates a new ExecutionEnvironment, it leads to
strange errors when used withing programs with more than one source.
上级 5b1231dd
......@@ -39,27 +39,29 @@ object StreamingTableFilter {
return
}
val cars = genCarStream().toTable
val env = StreamExecutionEnvironment.getExecutionEnvironment
val cars = env.fromCollection(genCarStream()).toTable
.filter('carId === 0)
.select('carId, 'speed, 'distance + 1000 as 'distance, 'time % 5 as 'time)
.toDataStream[CarEvent]
cars.print()
StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")
env.execute("TopSpeedWindowing")
}
def genCarStream(): DataStream[CarEvent] = {
def genCarStream(): Stream[CarEvent] = {
def nextSpeed(carEvent : CarEvent) : CarEvent =
{
val next =
if (Random.nextBoolean()) min(100, carEvent.speed + 5) else max(0, carEvent.speed - 5)
CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis)
def nextSpeed(carEvent : CarEvent) : CarEvent = {
val next = if (Random.nextBoolean()) min(100, carEvent.speed + 5)
else max(0, carEvent.speed - 5)
CarEvent(carEvent.carId, next, carEvent.distance + next/3.6d,System.currentTimeMillis())
}
def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] =
{
def carStream(speeds : Stream[CarEvent]) : Stream[CarEvent] = {
Thread.sleep(1000)
speeds.append(carStream(speeds.map(nextSpeed)))
}
......
......@@ -46,9 +46,6 @@ package object scala {
implicit def javaToScalaConnectedStream[IN1, IN2](javaStream: ConnectedJavaStreams[IN1, IN2]):
ConnectedStreams[IN1, IN2] = new ConnectedStreams[IN1, IN2](javaStream)
implicit def seqToFlinkSource[T: ClassTag: TypeInformation](scalaSeq: Seq[T]) : DataStream[T] =
StreamExecutionEnvironment.getExecutionEnvironment.fromCollection(scalaSeq)
private[flink] def fieldNames2Indices(
typeInfo: TypeInformation[_],
fields: Array[String]): Array[Int] = {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册