提交 f165c353 编写于 作者: G Gyula Fora

[scala] [streaming] Added support for iterative streams for scala api

上级 7aa68298
......@@ -509,7 +509,7 @@ Unlike in the core API the user does not define the maximum number of iterations
To start an iterative part of the program the user defines the iteration starting point:
~~~java
IterativeDataStream<Integer> iteration = source.iterate();
IterativeDataStream<Integer> iteration = source.iterate(maxWaitTimeMillis);
~~~
The operator applied on the iteration starting point is the head of the iteration, where data is fed back from the iteration tail.
......@@ -529,7 +529,7 @@ iteration.closeWith(tailOperator.select("iterate"));
In these case all output directed to the “iterate” edge would be fed back to the iteration head.
Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. From this it follows that iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
To use this function the user needs to call, the `iteration.setMaxWaitTime(millis)` to control the max wait time.
To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)` call to control the max wait time.
### Rich functions
The usage of rich functions are essentially the same as in the core Flink API. All transformations that take as argument a user-defined function can instead take a rich function as argument:
......
......@@ -383,14 +383,44 @@ public class DataStream<OUT> {
* the iteration head.
* <p>
* By default a DataStream with iteration will never terminate, but the user
* can use the {@link IterativeDataStream#setMaxWaitTime} call to set a max
* waiting time for the iteration head. If no data received in the set time,
* the stream terminates.
* can use the the maxWaitTime parameter to set a max waiting time for the
* iteration head. If no data received in the set time, the stream
* terminates.
*
* @return The iterative data stream created.
*/
public IterativeDataStream<OUT> iterate() {
return new IterativeDataStream<OUT>(this);
return new IterativeDataStream<OUT>(this, 0);
}
/**
* Initiates an iterative part of the program that feeds back data streams.
* The iterative part needs to be closed by calling
* {@link IterativeDataStream#closeWith(DataStream)}. The transformation of
* this IterativeDataStream will be the iteration head. The data stream
* given to the {@link IterativeDataStream#closeWith(DataStream)} method is
* the data stream that will be fed back and used as the input for the
* iteration head. A common usage pattern for streaming iterations is to use
* output splitting to send a part of the closing data stream to the head.
* Refer to {@link SingleOutputStreamOperator#split(OutputSelector)} for
* more information.
* <p>
* The iteration edge will be partitioned the same way as the first input of
* the iteration head.
* <p>
* By default a DataStream with iteration will never terminate, but the user
* can use the the maxWaitTime parameter to set a max waiting time for the
* iteration head. If no data received in the set time, the stream
* terminates.
*
* @param maxWaitTimeMillis
* Number of milliseconds to wait between inputs before shutting
* down
*
* @return The iterative data stream created.
*/
public IterativeDataStream<OUT> iterate(long maxWaitTimeMillis) {
return new IterativeDataStream<OUT>(this, maxWaitTimeMillis);
}
/**
......
......@@ -31,12 +31,12 @@ public class IterativeDataStream<IN> extends
protected Integer iterationID;
protected long waitTime;
protected IterativeDataStream(DataStream<IN> dataStream) {
protected IterativeDataStream(DataStream<IN> dataStream, long maxWaitTime) {
super(dataStream);
setBufferTimeout(dataStream.environment.getBufferTimeout());
iterationID = iterationCount;
iterationCount++;
waitTime = 0;
waitTime = maxWaitTime;
}
protected IterativeDataStream(DataStream<IN> dataStream, Integer iterationID, long waitTime) {
......@@ -71,20 +71,6 @@ public class IterativeDataStream<IN> extends
return iterationTail;
}
/**
* Sets the max waiting time for the next record before shutting down the
* stream. If not set, then the user needs to manually kill the process to
* stop.
*
* @param waitTimeMillis
* Max waiting time in milliseconds
* @return The modified DataStream.
*/
public IterativeDataStream<IN> setMaxWaitTime(long waitTimeMillis) {
this.waitTime = waitTimeMillis;
return this;
}
@Override
protected IterativeDataStream<IN> copy() {
return new IterativeDataStream<IN>(this, iterationID, waitTime);
......
......@@ -79,7 +79,7 @@ public class IterateTest {
DataStream<Boolean> source = env.fromElements(false, false, false);
IterativeDataStream<Boolean> iteration = source.iterate().setMaxWaitTime(3000);
IterativeDataStream<Boolean> iteration = source.iterate(3000);
DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).flatMap(
new IterationTail());
......
......@@ -70,12 +70,9 @@ public class IterateExample {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.setBufferTimeout(1);
// create an iterative data stream from the input
// create an iterative data stream from the input with 5 second timeout
IterativeDataStream<Tuple2<Double, Integer>> it = env.fromCollection(input).shuffle()
.iterate();
// trigger iteration termination if no new data received for 5 seconds
it.setMaxWaitTime(5000);
.iterate(5000);
// apply the step function to add new random value to the tuple and to
// increment the counter and split the output with the output selector
......
......@@ -68,7 +68,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
/**
* Gets the underlying java DataStream object.
*/
private[flink] def getJavaStream: JavaStream[T] = javaStream
def getJavaStream: JavaStream[T] = javaStream
/**
* Sets the degree of parallelism of this operation. This must be greater than 1.
......@@ -199,6 +199,32 @@ class DataStream[T](javaStream: JavaStream[T]) {
*/
def distribute: DataStream[T] = new DataStream[T](javaStream.distribute())
/**
* Initiates an iterative part of the program that creates a loop by feeding
* back data streams. To create a streaming iteration the user needs to define
* a transformation that creates two DataStreams.The first one one is the output
* that will be fed back to the start of the iteration and the second is the output
* stream of the iterative part.
* <p>
* stepfunction: initialStream => (feedback, output)
* <p>
* A common pattern is to use output splitting to create feedback and output DataStream.
* Please refer to the .split(...) method of the DataStream
* <p>
* By default a DataStream with iteration will never terminate, but the user
* can use the maxWaitTime parameter to set a max waiting time for the iteration head.
* If no data received in the set time the stream terminates.
*
*
*/
def iterate(stepFunction: DataStream[T] => (DataStream[T], DataStream[T]), maxWaitTimeMillis: Long = 0): DataStream[T] = {
val iterativeStream = javaStream.iterate(maxWaitTimeMillis)
val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
iterativeStream.closeWith(feedback.getJavaStream)
new DataStream[T](output.getJavaStream)
}
/**
* Applies an aggregation that that gives the current maximum of the data stream at
* the given position.
......@@ -231,34 +257,12 @@ class DataStream[T](javaStream: JavaStream[T]) {
case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
}
/**
* Applies an aggregation that that gives the current maximum element of the data stream by
* the given position. When equality, returns the first.
*
*/
def maxBy(field: Any): DataStream[T] = field match {
case field: Int => return new DataStream[T](javaStream.maxBy(field))
case field: String => return new DataStream[T](javaStream.maxBy(field))
case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
}
/**
* Applies an aggregation that that gives the current minimum element of the data stream by
* the given position. When equality, returns the first.
*
*/
def minBy(field: Any): DataStream[T] = field match {
case field: Int => return new DataStream[T](javaStream.minBy(field))
case field: String => return new DataStream[T](javaStream.minBy(field))
case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
}
/**
* Applies an aggregation that that gives the current minimum element of the data stream by
* the given position. When equality, the user can set to get the first or last element with the minimal value.
*
*/
def minBy(field: Any, first: Boolean): DataStream[T] = field match {
def minBy(field: Any, first: Boolean = true): DataStream[T] = field match {
case field: Int => return new DataStream[T](javaStream.minBy(field, first))
case field: String => return new DataStream[T](javaStream.minBy(field, first))
case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
......@@ -269,7 +273,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* the given position. When equality, the user can set to get the first or last element with the maximal value.
*
*/
def maxBy(field: Any, first: Boolean): DataStream[T] = field match {
def maxBy(field: Any, first: Boolean = true): DataStream[T] = field match {
case field: Int => return new DataStream[T](javaStream.maxBy(field, first))
case field: String => return new DataStream[T](javaStream.maxBy(field, first))
case _ => throw new IllegalArgumentException("Aggregations are only supported by field position (Int) or field expression (String)")
......@@ -469,15 +473,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* is written.
*
*/
def writeAsText(path: String, millis: Long): DataStream[T] = new DataStream[T](javaStream.writeAsText(path, millis))
/**
* Writes a DataStream to the file specified by path in text format.
* For every element of the DataStream the result of .toString
* is written.
*
*/
def writeAsText(path: String): DataStream[T] = new DataStream[T](javaStream.writeAsText(path))
def writeAsText(path: String, millis: Long = 0): DataStream[T] = new DataStream[T](javaStream.writeAsText(path, millis))
/**
* Writes a DataStream to the file specified by path in text format. The
......@@ -486,15 +482,7 @@ class DataStream[T](javaStream: JavaStream[T]) {
* is written.
*
*/
def writeAsCsv(path: String, millis: Long): DataStream[T] = new DataStream[T](javaStream.writeAsCsv(path, millis))
/**
* Writes a DataStream to the file specified by path in text format.
* For every element of the DataStream the result of .toString
* is written.
*
*/
def writeAsCsv(path: String): DataStream[T] = new DataStream[T](javaStream.writeAsCsv(path))
def writeAsCsv(path: String, millis: Long = 0): DataStream[T] = new DataStream[T](javaStream.writeAsCsv(path, millis))
/**
* Adds the given sink to this DataStream. Only streams with sinks added
......
......@@ -117,14 +117,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
*
*/
def generateSequence(from: Long, to: Long): DataStream[Long] = {
val source = new SourceFunction[Long] {
override def invoke(out: Collector[Long]) = {
for (i <- from.to(to)) {
out.collect(i)
}
}
}
addSource(source)
new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)).asInstanceOf[DataStream[Long]]
}
/**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册