提交 10a81862 编写于 作者: G Gyula Fora 提交者: mbalassi

[streaming] Replaced partitionBy with groupBy + re-added global partitioning

上级 b22406a6
......@@ -162,11 +162,11 @@ Usage: `dataStream.shuffle()`
* *Distribute*: Distribute partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
Usage: `dataStream.distribute()`
* *Field/Key*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. The user can define keys by field positions (for tuple and array types), field expressions (for Pojo types) and custom keys using the `KeySelector` interface.
Usage: `dataStream.partitionBy(keys)`
Usage: `dataStream.groupBy(keys)`
* *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances of the next operator.
Usage: `dataStream.broadcast()`
* *Global*: All data points end up at the same operator instance. To achieve this use the parallelism setting of the corresponding operator.
Usage: `operator.setParallelism(1)`
* *Global*: All data are sent to the first instance of the next processing operator. Use this option with care to avoid serious performance bottlenecks.
Usage: `dataStream.global()`
### Sources
......
......@@ -29,10 +29,8 @@ import org.apache.sling.commons.json.JSONException;
* This program demonstrate the use of TwitterSource.
* Its aim is to count the frequency of the languages of tweets
*/
public class TwitterLocal {
public class TwitterTopology {
private static final int PARALLELISM = 1;
private static final int SOURCE_PARALLELISM = 1;
private static final int NUMBEROFTWEETS = 100;
/**
......@@ -69,18 +67,14 @@ public class TwitterLocal {
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment
.createLocalEnvironment(PARALLELISM);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS))
.setParallelism(SOURCE_PARALLELISM);
DataStream<String> streamSource = env.addSource(new TwitterSource(path, NUMBEROFTWEETS));
DataStream<Tuple2<String, Integer>> dataStream = streamSource
.flatMap(new SelectLanguageFlatMap())
.partitionBy(0)
.map(new MapFunction<String, Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
......
......@@ -67,6 +67,7 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.partitioner.DistributePartitioner;
import org.apache.flink.streaming.partitioner.FieldsPartitioner;
import org.apache.flink.streaming.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
......@@ -239,7 +240,9 @@ public class DataStream<OUT> {
/**
* Groups the elements of a {@link DataStream} by the given key positions to
* be used with grouped operators like
* {@link GroupedDataStream#reduce(ReduceFunction)}
* {@link GroupedDataStream#reduce(ReduceFunction)}</p> This operator also
* affects the partitioning of the stream, by forcing values with the same
* key to go to the same processing instance.
*
* @param fields
* The position of the fields on which the {@link DataStream}
......@@ -259,7 +262,9 @@ public class DataStream<OUT> {
* is either the name of a public field or a getter method with parentheses
* of the {@link DataStream}S underlying type. A dot can be used to drill
* down into objects, as in {@code "field1.getInnerField2()" }. This method
* returns an {@link GroupedDataStream}.
* returns an {@link GroupedDataStream}.</p> This operator also affects the
* partitioning of the stream, by forcing values with the same key to go to
* the same processing instance.
*
* @param fields
* One or more field expressions on which the DataStream will be
......@@ -275,7 +280,10 @@ public class DataStream<OUT> {
/**
* Groups the elements of a {@link DataStream} by the key extracted by the
* {@link KeySelector} to be used with grouped operators like
* {@link GroupedDataStream#reduce(ReduceFunction)}
* {@link GroupedDataStream#reduce(ReduceFunction)}.
* <p/>
* This operator also affects the partitioning of the stream, by forcing
* values with the same key to go to the same processing instance.
*
* @param keySelector
* The {@link KeySelector} that will be used to extract keys for
......@@ -291,42 +299,6 @@ public class DataStream<OUT> {
getType())));
}
/**
* Sets the partitioning of the {@link DataStream} so that the output is
* partitioned by the selected fields. This setting only effects the how the
* outputs will be distributed between the parallel instances of the next
* processing operator.
*
* @param fields
* The fields to partition by.
* @return The DataStream with fields partitioning set.
*/
public DataStream<OUT> partitionBy(int... fields) {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
return partitionBy(new KeySelectorUtil.ArrayKeySelector<OUT>(fields));
} else {
return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
}
}
/**
* Sets the partitioning of the {@link DataStream} so that the output is
* partitioned by the given field expressions. This setting only effects the
* how the outputs will be distributed between the parallel instances of the
* next processing operator.
*
* @param fields
* The fields expressions to partition by.
* @return The DataStream with fields partitioning set.
*/
public DataStream<OUT> partitionBy(String... fields) {
return partitionBy(new Keys.ExpressionKeys<OUT>(fields, getType()));
}
private DataStream<OUT> partitionBy(Keys<OUT> keys) {
return partitionBy(KeySelectorUtil.getSelectorForKeys(keys, getType()));
}
/**
* Sets the partitioning of the {@link DataStream} so that the output is
* partitioned using the given {@link KeySelector}. This setting only
......@@ -336,7 +308,7 @@ public class DataStream<OUT> {
* @param keySelector
* @return
*/
public DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
protected DataStream<OUT> partitionBy(KeySelector<OUT, ?> keySelector) {
return setConnectionType(new FieldsPartitioner<OUT>(clean(keySelector)));
}
......@@ -389,6 +361,18 @@ public class DataStream<OUT> {
return setConnectionType(new DistributePartitioner<OUT>(false));
}
/**
* Sets the partitioning of the {@link DataStream} so that the output values
* all go to the first instance of the next processing operator. Use this
* setting with care since it might cause a serious performance bottleneck
* in the application.
*
* @return The DataStream with shuffle partitioning set.
*/
public DataStream<OUT> global() {
return setConnectionType(new GlobalPartitioner<OUT>());
}
/**
* Initiates an iterative part of the program that feeds back data streams.
* The iterative part needs to be closed by calling
......@@ -1007,7 +991,8 @@ public class DataStream<OUT> {
protected <R> DataStream<OUT> addIterationSource(Integer iterationID, long waitTime) {
DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null, true);
DataStream<R> returnStream = new DataStreamSource<R>(environment, "iterationSource", null,
true);
jobGraphBuilder.addIterationHead(returnStream.getId(), this.getId(), iterationID,
degreeOfParallelism, waitTime);
......
......@@ -24,7 +24,6 @@ import org.apache.commons.lang3.SerializationException;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
......@@ -150,21 +149,6 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
return this;
}
@SuppressWarnings("unchecked")
public SingleOutputStreamOperator<OUT, O> partitionBy(int... keypositions) {
return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keypositions);
}
@SuppressWarnings("unchecked")
public SingleOutputStreamOperator<OUT, O> partitionBy(String... fields) {
return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(fields);
}
@SuppressWarnings("unchecked")
public SingleOutputStreamOperator<OUT, O> partitionBy(KeySelector<OUT, ?> keySelector) {
return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keySelector);
}
@SuppressWarnings("unchecked")
public SingleOutputStreamOperator<OUT, O> broadcast() {
return (SingleOutputStreamOperator<OUT, O>) super.broadcast();
......
......@@ -71,7 +71,7 @@ public class TwitterStream {
DataStream<Tuple2<String, Integer>> tweets = streamSource
// selecting English tweets and splitting to words
.flatMap(new SelectEnglishAndTokenizeFlatMap()).partitionBy(0)
.flatMap(new SelectEnglishAndTokenizeFlatMap())
// returning (word, 1)
.map(new MapFunction<String, Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
......
......@@ -18,6 +18,7 @@
package org.apache.flink.streaming.examples.windowing;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.source.SourceFunction;
......@@ -43,7 +44,13 @@ public class MultiplePoliciesExample {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream = env.addSource(new BasicSource())
.groupBy(0)
.groupBy(new KeySelector<String, String>(){
private static final long serialVersionUID = 1L;
@Override
public String getKey(String value) throws Exception {
return value;
}
})
.window(Count.of(2))
.every(Count.of(3), Count.of(5))
.reduceGroup(new Concat());
......
......@@ -126,39 +126,6 @@ class DataStream[T](javaStream: JavaStream[T]) {
javaStream.groupBy(keyExtractor)
}
/**
* Sets the partitioning of the DataStream so that the output is
* partitioned by the selected fields. This setting only effects the how the outputs will be
* distributed between the parallel instances of the next processing operator.
*
*/
def partitionBy(fields: Int*): DataStream[T] =
javaStream.partitionBy(fields: _*)
/**
* Sets the partitioning of the DataStream so that the output is
* partitioned by the selected fields. This setting only effects the how the outputs will be
* distributed between the parallel instances of the next processing operator.
*
*/
def partitionBy(firstField: String, otherFields: String*): DataStream[T] =
javaStream.partitionBy(firstField +: otherFields.toArray: _*)
/**
* Sets the partitioning of the DataStream so that the output is
* partitioned by the given Key. This setting only effects the how the outputs will be
* distributed between the parallel instances of the next processing operator.
*
*/
def partitionBy[K: TypeInformation](fun: T => K): DataStream[T] = {
val keyExtractor = new KeySelector[T, K] {
val cleanFun = clean(fun)
def getKey(in: T) = cleanFun(in)
}
javaStream.partitionBy(keyExtractor)
}
/**
* Sets the partitioning of the DataStream so that the output tuples
* are broadcasted to every parallel instance of the next component. This
......@@ -167,6 +134,13 @@ class DataStream[T](javaStream: JavaStream[T]) {
*
*/
def broadcast: DataStream[T] = javaStream.broadcast()
/**
* Sets the partitioning of the DataStream so that the output values all go to
* the first instance of the next processing operator. Use this setting with care
* since it might cause a serious performance bottlenect in the application.
*/
def global: DataStream[T] = javaStream.global()
/**
* Sets the partitioning of the DataStream so that the output tuples
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册