提交 4eabd4db 编写于 作者: S Stephan Ewen

[hotfix] Make DataStream property methods properly Scalaesk

This also includes some minor cleanups

This closes #1689
上级 c2a43c95
......@@ -128,7 +128,7 @@ public class DataStream<T> {
* @return ID of the DataStream
*/
@Internal
public Integer getId() {
public int getId() {
return transformation.getId();
}
......
......@@ -40,7 +40,7 @@ public class StreamNode implements Serializable {
transient private StreamExecutionEnvironment env;
private final Integer id;
private final int id;
private Integer parallelism = null;
private Long bufferTimeout = null;
private final String operatorName;
......@@ -124,7 +124,7 @@ public class StreamNode implements Serializable {
return inEdgeIndices;
}
public Integer getId() {
public int getId() {
return id;
}
......@@ -264,12 +264,11 @@ public class StreamNode implements Serializable {
}
StreamNode that = (StreamNode) o;
return id.equals(that.id);
return id == that.id;
}
@Override
public int hashCode() {
return id.hashCode();
return id;
}
}
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
......@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
......@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.graph;
import org.apache.flink.api.common.ExecutionConfig;
......@@ -39,7 +40,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.util.EvenOddOutputSelector;
import org.apache.flink.streaming.util.NoOpIntMap;
import org.apache.flink.streaming.util.NoOpSink;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
......
......@@ -18,7 +18,8 @@
package org.apache.flink.streaming.api.scala
import org.apache.flink.annotation.{PublicEvolving, Public}
import org.apache.flink.annotation.{Internal, PublicEvolving, Public}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner}
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.common.typeinfo.TypeInformation
......@@ -42,31 +43,80 @@ import scala.collection.JavaConverters._
@Public
class DataStream[T](stream: JavaStream[T]) {
/**
* Gets the underlying java DataStream object.
*/
def javaStream: JavaStream[T] = stream
/**
* Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]].
*
* @return associated execution environment
* @return associated execution environment
* @deprecated Use [[executionEnvironment]] instead
*/
@deprecated
@PublicEvolving
def getExecutionEnvironment: StreamExecutionEnvironment =
new StreamExecutionEnvironment(stream.getExecutionEnvironment)
/**
* Returns the ID of the DataStream.
*
* @return ID of the DataStream
* Returns the TypeInformation for the elements of this DataStream.
*
* @deprecated Use [[dataType]] instead.
*/
@deprecated
@PublicEvolving
def getId = stream.getId
def getType(): TypeInformation[T] = stream.getType()
/**
* Returns the parallelism of this operation.
*
* @deprecated Use [[parallelism]] instead.
*/
@deprecated
@PublicEvolving
def getParallelism = stream.getParallelism
/**
* Returns the execution config.
*
* @deprecated Use [[executionConfig]] instead.
*/
@deprecated
@PublicEvolving
def getExecutionConfig = stream.getExecutionConfig
/**
* Returns the ID of the DataStream.
*/
@Internal
private[flink] def getId = stream.getId()
// --------------------------------------------------------------------------
// Scalaesk accessors
// --------------------------------------------------------------------------
/**
* Gets the underlying java DataStream object.
*/
def javaStream: JavaStream[T] = stream
/**
* Returns the TypeInformation for the elements of this DataStream.
*/
def getType(): TypeInformation[T] = stream.getType()
def dataType: TypeInformation[T] = stream.getType()
/**
* Returns the execution config.
*/
def executionConfig: ExecutionConfig = stream.getExecutionConfig()
/**
* Returns the [[StreamExecutionEnvironment]] associated with this data stream
*/
def executionEnvironment: StreamExecutionEnvironment =
new StreamExecutionEnvironment(stream.getExecutionEnvironment())
/**
* Returns the parallelism of this operation.
*/
def parallelism: Int = stream.getParallelism()
/**
* Sets the parallelism of this operation. This must be at least 1.
......@@ -75,34 +125,36 @@ class DataStream[T](stream: JavaStream[T]) {
stream match {
case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(parallelism)
case _ =>
throw new UnsupportedOperationException("Operator " + stream.toString + " cannot " +
"have " +
"parallelism.")
throw new UnsupportedOperationException(
"Operator " + stream + " cannot set the parallelism.")
}
this
}
/**
* Returns the parallelism of this operation.
*/
def getParallelism = stream.getParallelism
/**
* Returns the execution config.
*/
def getExecutionConfig = stream.getExecutionConfig
/**
* Gets the name of the current data stream. This name is
* used by the visualization and logging during runtime.
*
* @return Name of the stream.
*/
def getName : String = stream match {
def name: String = stream match {
case stream : SingleOutputStreamOperator[T,_] => stream.getName
case _ => throw new
UnsupportedOperationException("Only supported for operators.")
}
// --------------------------------------------------------------------------
/**
* Gets the name of the current data stream. This name is
* used by the visualization and logging during runtime.
*
* @return Name of the stream.
* @deprecated Use [[name]] instead
*/
@deprecated
@PublicEvolving
def getName : String = name
/**
* Sets the name of the current data stream. This name is
......@@ -209,6 +261,10 @@ class DataStream[T](stream: JavaStream[T]) {
this
}
// --------------------------------------------------------------------------
// Stream Transformations
// --------------------------------------------------------------------------
/**
* Creates a new DataStream by merging DataStream outputs of
* the same type with each other. The DataStreams merged using this operator
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册