提交 4a7ba2db 编写于 作者: G Gyula Fora 提交者: mbalassi

[FLINK-1312] [streaming] OutputSelector changed to SAM-type to allow java 8 lambdas for splitting

上级 51c1f677
......@@ -459,36 +459,52 @@ The Reduce operator for the `ConnectedDataStream` applies a simple reduce transf
### Output splitting
Most data stream operators support directed outputs, meaning that different data elements are received by only given outputs. The outputs are referenced by their name given at the point of receiving:
Most data stream operators support directed outputs (output splitting), meaning that different output elements are sent only to specific outputs. The outputs are referenced by their name given at the point of receiving:
~~~java
SplitDataStream<Integer> split = someDataStream.split(outputSelector);
DataStream<Integer> even = split.select("even");
DataStream<Integer> even = split.select("even);
DataStream<Integer> odd = split.select("odd");
~~~
Data streams only receive the elements directed to selected output names. These outputs are directed by implementing a selector function (extending `OutputSelector`):
In the above example the data stream named ‘even’ will only contain elements that are directed to the output named “even”. The user can of course further transform these new stream by for example squaring only the even elements.
Data streams only receive the elements directed to selected output names. The user can also select multiple output names by `splitStream.select(“output1”, “output2”…)`. It is common that a stream listens to all the outputs, so `split.selectAll()` provides this functionality without having to select all names.
The outputs of an operator are directed by implementing a selector function (implementing the `OutputSelector` interface):
~~~java
void select(OUT value, Collection<String> outputs);
Iterable<String> select(OUT value);
~~~
The data is sent to all the outputs added to the collection outputs (referenced by their name). This way the direction of the outputs can be determined by the value of the data sent. For example:
The data is sent to all the outputs returned in the iterable (referenced by their name). This way the direction of the outputs can be determined by the value of the data sent.
For example to split even and odd numbers:
~~~java
@Override
void select(Integer value, Collection<String> outputs) {
Iterable<String> select(Integer value) {
List<String> outputs = new ArrayList<String>();
if (value % 2 == 0) {
outputs.add("even");
} else {
outputs.add("odd");
}
return outputs;
}
~~~
This output selection allows data streams to listen to multiple outputs, and data points to be sent to multiple outputs. A value is sent to all the outputs specified in the `OutputSelector` and a data stream will receive a value if it has selected any of the outputs the value is sent to. The stream will receive the data at most once.
It is common that a stream listens to all the outputs, so `split.selectAll()` is provided as an alias for explicitly selecting all output names.
Or more compactly we can use lambda expressions in Java 8:
~~~java
SplitDataStream<Integer> split = someDataStream
.split(x -> Arrays.asList(String.valueOf(x % 2)));
~~~
Every output will be emitted to the selected outputs exactly once, even if you add the same output names more than once.
### Iterations
The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the core Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`.
......
......@@ -17,9 +17,8 @@
package org.apache.flink.streaming.api.collector;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
......@@ -61,7 +60,7 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
super(channelID, serializationDelegate);
this.outputSelector = outputSelector;
this.emitted = new HashSet<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
this.selectAllOutputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
this.selectAllOutputs = new LinkedList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
}
@Override
......@@ -81,19 +80,25 @@ public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
*
*/
protected void emitToOutputs() {
Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
Iterable<String> outputNames = outputSelector.select(streamRecord.getObject());
emitted.clear();
for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : selectAllOutputs) {
try {
output.emit(serializationDelegate);
} catch (Exception e) {
if (LOG.isErrorEnabled()) {
LOG.error("Emit to {} failed due to: {}", output,
StringUtils.stringifyException(e));
}
}
}
emitted.addAll(selectAllOutputs);
for (String outputName : outputNames) {
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> outputList = outputMap
.get(outputName);
try {
for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output : selectAllOutputs) {
if (!emitted.contains(output)) {
output.emit(serializationDelegate);
emitted.add(output);
}
}
if (outputList == null) {
if (LOG.isErrorEnabled()) {
String format = String.format(
......
......@@ -18,45 +18,27 @@
package org.apache.flink.streaming.api.collector;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
/**
* Class for defining an OutputSelector for a {@link SplitDataStream} using the
* {@link SingleOutputStreamOperator#split} call. Every output object of a
* Interface for defining an OutputSelector for a {@link SplitDataStream} using
* the {@link SingleOutputStreamOperator#split} call. Every output object of a
* {@link SplitDataStream} will run through this operator to select outputs.
*
* @param <OUT>
* Type parameter of the split values.
*/
public abstract class OutputSelector<OUT> implements Serializable {
private static final long serialVersionUID = 1L;
private Collection<String> outputs;
public OutputSelector() {
outputs = new ArrayList<String>();
}
Collection<String> getOutputs(OUT outputObject) {
outputs.clear();
select(outputObject, outputs);
return outputs;
}
public interface OutputSelector<OUT> extends Serializable {
/**
* Method for selecting output names for the emitted objects when using the
* {@link SingleOutputStreamOperator#split} method. The values will be
* emitted only to output names which are added to the outputs collection.
* The outputs collection is cleared automatically after each select call.
* emitted only to output names which are contained in the returned
* iterable.
*
* @param value
* Output object for which the output selection should be made.
* @param outputs
* Selected output names should be added to this collection.
*/
public abstract void select(OUT value, Collection<String> outputs);
public Iterable<String> select(OUT value);
}
......@@ -22,13 +22,11 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
......@@ -54,27 +52,31 @@ public class DirectedOutputTest {
}
}
static final class MyOutputSelector extends OutputSelector<Long> {
static final class MyOutputSelector implements OutputSelector<Long> {
private static final long serialVersionUID = 1L;
List<String> outputs = new ArrayList<String>();
@Override
public void select(Long value, Collection<String> outputs) {
public Iterable<String> select(Long value) {
outputs.clear();
if (value % 2 == 0) {
outputs.add(EVEN);
} else {
outputs.add(ODD);
}
if (value == 10L) {
outputs.add(TEN);
}
if (value == 11L) {
outputs.add(NON_SELECTED);
}
return outputs;
}
}
static final class ListSink implements SinkFunction<Long> {
private static final long serialVersionUID = 1L;
......@@ -99,23 +101,23 @@ public class DirectedOutputTest {
}
private static Map<String, List<Long>> outputs = new HashMap<String, List<Long>>();
@Test
public void outputSelectorTest() throws Exception {
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector());
source.select(EVEN).addSink(new ListSink(EVEN));
source.select(ODD, TEN).addSink(new ListSink(ODD_AND_TEN));
source.select(EVEN, ODD).addSink(new ListSink(EVEN_AND_ODD));
source.selectAll().addSink(new ListSink(ALL));
env.executeTest(128);
assertEquals(Arrays.asList(2L, 4L, 6L, 8L, 10L), outputs.get(EVEN));
assertEquals(Arrays.asList(1L, 3L, 5L, 7L, 9L, 10L, 11L), outputs.get(ODD_AND_TEN));
assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(EVEN_AND_ODD));
assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L),
outputs.get(EVEN_AND_ODD));
assertEquals(Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), outputs.get(ALL));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api.collector;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple1;
import org.junit.Test;
public class OutputSelectorTest {
static final class MyOutputSelector extends OutputSelector<Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void select(Tuple1<Integer> tuple, Collection<String> outputs) {
for (Integer i = 0; i < tuple.f0; i++) {
outputs.add(i.toString());
}
}
}
@Test
public void testGetOutputs() {
OutputSelector<Tuple1<Integer>> selector = new MyOutputSelector();
List<String> expectedOutputs = new ArrayList<String>();
expectedOutputs.add("0");
expectedOutputs.add("1");
assertEquals(expectedOutputs, selector.getOutputs(new Tuple1<Integer>(2)));
expectedOutputs.add("2");
assertEquals(expectedOutputs, selector.getOutputs(new Tuple1<Integer>(3)));
}
}
*/
package org.apache.flink.streaming.api.collector;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple1;
import org.junit.Test;
public class OutputSelectorTest {
static final class MyOutputSelector implements OutputSelector<Tuple1<Integer>> {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> select(Tuple1<Integer> tuple) {
String[] outputs = new String[tuple.f0];
for (Integer i = 0; i < tuple.f0; i++) {
outputs[i] = i.toString();
}
return Arrays.asList(outputs);
}
}
@Test
public void testGetOutputs() {
OutputSelector<Tuple1<Integer>> selector = new MyOutputSelector();
List<String> expectedOutputs = new ArrayList<String>();
expectedOutputs.add("0");
expectedOutputs.add("1");
assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(2)));
expectedOutputs.add("2");
assertEquals(expectedOutputs, selector.select(new Tuple1<Integer>(3)));
}
}
......@@ -18,7 +18,6 @@
package org.apache.flink.streaming.examples.iteration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Random;
......@@ -79,8 +78,8 @@ public class IterateExample {
// apply the step function to add new random value to the tuple and to
// increment the counter and split the output with the output selector
SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle().setBufferTimeout(1)
.split(new MySelector());
SplitDataStream<Tuple2<Double, Integer>> step = it.map(new Step()).shuffle()
.setBufferTimeout(1).split(new MySelector());
// close the iteration by selecting the tuples that were directed to the
// 'iterate' channel in the output selector
......@@ -129,16 +128,18 @@ public class IterateExample {
/**
* OutputSelector testing which tuple needs to be iterated again.
*/
public static class MySelector extends OutputSelector<Tuple2<Double, Integer>> {
public static class MySelector implements OutputSelector<Tuple2<Double, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void select(Tuple2<Double, Integer> value, Collection<String> outputs) {
public Iterable<String> select(Tuple2<Double, Integer> value) {
List<String> output = new ArrayList<String>();
if (value.f0 > 100) {
outputs.add("output");
output.add("output");
} else {
outputs.add("iterate");
output.add("iterate");
}
return output;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册