提交 06a42bf6 编写于 作者: S Stephan Ewen

[FLINK-3419] [streaming] Drop 'partitionByHash' function, subsumed by 'keyBy()'

上级 a8afec3b
......@@ -960,18 +960,6 @@ via the following functions.
</tr>
</thead>
<tbody>
<tr>
<td><strong>Hash partitioning</strong><br>DataStream &rarr; DataStream</td>
<td>
<p>
Identical to keyBy but returns a DataStream instead of a KeyedStream.
{% highlight java %}
dataStream.partitionByHash("someKey");
dataStream.partitionByHash(0);
{% endhighlight %}
</p>
</td>
</tr>
<tr>
<td><strong>Custom partitioning</strong><br>DataStream &rarr; DataStream</td>
<td>
......@@ -1080,18 +1068,6 @@ dataStream.broadcast();
</tr>
</thead>
<tbody>
<tr>
<td><strong>Hash partitioning</strong><br>DataStream &rarr; DataStream</td>
<td>
<p>
Identical to keyBy but returns a DataStream instead of a KeyedStream.
{% highlight scala %}
dataStream.partitionByHash("someKey")
dataStream.partitionByHash(0)
{% endhighlight %}
</p>
</td>
</tr>
<tr>
<td><strong>Custom partitioning</strong><br>DataStream &rarr; DataStream</td>
<td>
......
......@@ -187,89 +187,6 @@ public class ConnectedStreams<IN1, IN2> {
inputStream2.keyBy(keySelector2));
}
/**
* PartitionBy operation for connected data stream. Partitions the elements of
* input1 and input2 according to keyPosition1 and keyPosition2.
*
* @param keyPosition1
* The field used to compute the hashcode of the elements in the
* first input stream.
* @param keyPosition2
* The field used to compute the hashcode of the elements in the
* second input stream.
* @return The partitioned {@link ConnectedStreams}
*/
public ConnectedStreams<IN1, IN2> partitionByHash(int keyPosition1, int keyPosition2) {
return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keyPosition1),
inputStream2.partitionByHash(keyPosition2));
}
/**
* PartitionBy operation for connected data stream. Partitions the elements of
* input1 and input2 according to keyPositions1 and keyPositions2.
*
* @param keyPositions1
* The fields used to group the first input stream.
* @param keyPositions2
* The fields used to group the second input stream.
* @return The partitioned {@link ConnectedStreams}
*/
public ConnectedStreams<IN1, IN2> partitionByHash(int[] keyPositions1, int[] keyPositions2) {
return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keyPositions1),
inputStream2.partitionByHash(keyPositions2));
}
/**
* PartitionBy operation for connected data stream using key expressions. Partitions
* the elements of input1 and input2 according to field1 and field2. A
* field expression is either the name of a public field or a getter method
* with parentheses of the {@link DataStream}s underlying type. A dot can be
* used to drill down into objects, as in {@code "field1.getInnerField2()" }
*
* @param field1
* The partitioning expressions for the first input
* @param field2
* The partitioning expressions for the second input
* @return The partitioned {@link ConnectedStreams}
*/
public ConnectedStreams<IN1, IN2> partitionByHash(String field1, String field2) {
return new ConnectedStreams<>(environment, inputStream1.partitionByHash(field1),
inputStream2.partitionByHash(field2));
}
/**
* PartitionBy operation for connected data stream using key expressions. Partitions
* the elements of input1 and input2 according to fields1 and fields2. A
* field expression is either the name of a public field or a getter method
* with parentheses of the {@link DataStream}s underlying type. A dot can be
* used to drill down into objects, as in {@code "field1.getInnerField2()" }
*
* @param fields1
* The partitioning expressions for the first input
* @param fields2
* The partitioning expressions for the second input
* @return The partitioned {@link ConnectedStreams}
*/
public ConnectedStreams<IN1, IN2> partitionByHash(String[] fields1, String[] fields2) {
return new ConnectedStreams<>(environment, inputStream1.partitionByHash(fields1),
inputStream2.partitionByHash(fields2));
}
/**
* PartitionBy operation for connected data stream. Partitions the elements of
* input1 and input2 using keySelector1 and keySelector2.
*
* @param keySelector1
* The {@link KeySelector} used for partitioning the first input
* @param keySelector2
* The {@link KeySelector} used for partitioning the second input
* @return @return The partitioned {@link ConnectedStreams}
*/
public ConnectedStreams<IN1, IN2> partitionByHash(KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2) {
return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keySelector1),
inputStream2.partitionByHash(keySelector2));
}
/**
* Applies a CoMap transformation on a {@link ConnectedStreams} and maps
* the output to a common type. The transformation calls a
......
......@@ -85,7 +85,6 @@ import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
......@@ -278,61 +277,6 @@ public class DataStream<T> {
getType(), getExecutionConfig())));
}
/**
* Sets the partitioning of the {@link DataStream} so that the output is
* partitioned hashing on the given fields. This setting only
* effects the how the outputs will be distributed between the parallel
* instances of the next processing operator.
*
* @param fields The tuple fields that should be used for partitioning
* @return The partitioned DataStream
*
*/
public DataStream<T> partitionByHash(int... fields) {
if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) {
return partitionByHash(KeySelectorUtil.getSelectorForArray(fields, getType()));
} else {
return partitionByHash(new Keys.ExpressionKeys<>(fields, getType()));
}
}
/**
* Sets the partitioning of the {@link DataStream} so that the output is
* partitioned hashing on the given fields. This setting only
* effects the how the outputs will be distributed between the parallel
* instances of the next processing operator.
*
* @param fields The tuple fields that should be used for partitioning
* @return The partitioned DataStream
*
*/
public DataStream<T> partitionByHash(String... fields) {
return partitionByHash(new Keys.ExpressionKeys<>(fields, getType()));
}
/**
* Sets the partitioning of the {@link DataStream} so that the output is
* partitioned using the given {@link KeySelector}. This setting only
* effects the how the outputs will be distributed between the parallel
* instances of the next processing operator.
*
* @param keySelector The function that extracts the key from an element in the Stream
* @return The partitioned DataStream
*/
public DataStream<T> partitionByHash(KeySelector<T, ?> keySelector) {
return setConnectionType(new HashPartitioner<>(clean(keySelector)));
}
//private helper method for partitioning
private DataStream<T> partitionByHash(Keys<T> keys) {
KeySelector<T, ?> keySelector = clean(KeySelectorUtil.getSelectorForKeys(
keys,
getType(),
getExecutionConfig()));
return setConnectionType(new HashPartitioner<>(keySelector));
}
/**
* Partitions a tuple DataStream on the specified key fields using a custom partitioner.
* This method takes the key position to partition on, and a partitioner that accepts the key type.
......
......@@ -202,21 +202,6 @@ public class IterativeStream<T> extends SingleOutputStreamOperator<T, IterativeS
@Override
public ConnectedStreams<I, F> keyBy(KeySelector<I, ?> keySelector1,KeySelector<F, ?> keySelector2) {throw groupingException;}
@Override
public ConnectedStreams<I, F> partitionByHash(int keyPosition1, int keyPosition2) {throw groupingException;}
@Override
public ConnectedStreams<I, F> partitionByHash(int[] keyPositions1, int[] keyPositions2) {throw groupingException;}
@Override
public ConnectedStreams<I, F> partitionByHash(String field1, String field2) {throw groupingException;}
@Override
public ConnectedStreams<I, F> partitionByHash(String[] fields1, String[] fields2) {throw groupingException;}
@Override
public ConnectedStreams<I, F> partitionByHash(KeySelector<I, ?> keySelector1, KeySelector<F, ?> keySelector2) {throw groupingException;}
}
}
......@@ -64,6 +64,7 @@ import org.junit.Test;
import static org.junit.Assert.*;
@SuppressWarnings("serial")
public class DataStreamTest extends StreamingMultipleProgramsTestBase {
/**
......@@ -225,18 +226,17 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
}).name("testMap");
DataStreamSink<Long> connected = dataStream1.connect(dataStream2)
dataStream1.connect(dataStream2)
.flatMap(new CoFlatMapFunction<Long, Long, Long>() {
private static final long serialVersionUID = 1L;
@Override
public void flatMap1(Long value, Collector<Long> out) throws Exception {
}
public void flatMap1(Long value, Collector<Long> out) throws Exception {}
@Override
public void flatMap2(Long value, Collector<Long> out) throws Exception {
}
public void flatMap2(Long value, Collector<Long> out) throws Exception {}
}).name("testCoFlatMap")
.windowAll(GlobalWindows.create())
.trigger(PurgingTrigger.of(CountTrigger.of(10)))
.fold(0L, new FoldFunction<Long, Long>() {
......@@ -262,7 +262,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
/**
* Tests that {@link DataStream#keyBy} and {@link DataStream#partitionByHash} result in
* Tests that {@link DataStream#keyBy} and {@link DataStream#partitionCustom(Partitioner, int)} result in
* different and correct topologies. Does the some for the {@link ConnectedStreams}.
*/
@Test
......@@ -296,10 +296,10 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertTrue(isKeyed(group4));
//Testing DataStream partitioning
DataStream<Tuple2<Long, Long>> partition1 = src1.partitionByHash(0);
DataStream<Tuple2<Long, Long>> partition2 = src1.partitionByHash(1, 0);
DataStream<Tuple2<Long, Long>> partition3 = src1.partitionByHash("f0");
DataStream<Tuple2<Long, Long>> partition4 = src1.partitionByHash(new FirstSelector());
DataStream<Tuple2<Long, Long>> partition1 = src1.keyBy(0);
DataStream<Tuple2<Long, Long>> partition2 = src1.keyBy(1, 0);
DataStream<Tuple2<Long, Long>> partition3 = src1.keyBy("f0");
DataStream<Tuple2<Long, Long>> partition4 = src1.keyBy(new FirstSelector());
int pid1 = createDownStreamId(partition1);
int pid2 = createDownStreamId(partition2);
......@@ -311,10 +311,10 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), pid3)));
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), pid4)));
assertFalse(isKeyed(partition1));
assertFalse(isKeyed(partition3));
assertFalse(isKeyed(partition2));
assertFalse(isKeyed(partition4));
assertTrue(isKeyed(partition1));
assertTrue(isKeyed(partition3));
assertTrue(isKeyed(partition2));
assertTrue(isKeyed(partition4));
// Testing DataStream custom partitioning
Partitioner<Long> longPartitioner = new Partitioner<Long>() {
......@@ -378,19 +378,19 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertTrue(isKeyed(connectedGroup5));
//Testing ConnectedStreams partitioning
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition1 = connected.partitionByHash(0, 0);
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition1 = connected.keyBy(0, 0);
Integer connectDownStreamId1 = createDownStreamId(connectedPartition1);
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0});
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition2 = connected.keyBy(new int[]{0}, new int[]{0});
Integer connectDownStreamId2 = createDownStreamId(connectedPartition2);
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition3 = connected.partitionByHash("f0", "f0");
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition3 = connected.keyBy("f0", "f0");
Integer connectDownStreamId3 = createDownStreamId(connectedPartition3);
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"});
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"});
Integer connectDownStreamId4 = createDownStreamId(connectedPartition4);
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector());
ConnectedStreams<Tuple2<Long, Long>, Tuple2<Long, Long>> connectedPartition5 = connected.keyBy(new FirstSelector(), new FirstSelector());
Integer connectDownStreamId5 = createDownStreamId(connectedPartition5);
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(),
......@@ -418,11 +418,11 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase {
assertTrue(isPartitioned(env.getStreamGraph().getStreamEdges(src2.getId(),
connectDownStreamId5)));
assertFalse(isKeyed(connectedPartition1));
assertFalse(isKeyed(connectedPartition2));
assertFalse(isKeyed(connectedPartition3));
assertFalse(isKeyed(connectedPartition4));
assertFalse(isKeyed(connectedPartition5));
assertTrue(isKeyed(connectedPartition1));
assertTrue(isKeyed(connectedPartition2));
assertTrue(isKeyed(connectedPartition3));
assertTrue(isKeyed(connectedPartition4));
assertTrue(isKeyed(connectedPartition5));
}
/**
......
......@@ -103,7 +103,7 @@ public class PartitionerTest extends StreamingMultipleProgramsTestBase {
// partition by hash
src
.partitionByHash(0)
.keyBy(0)
.map(new SubtaskIndexAssigner())
.addSink(hashPartitionResultSink);
......
......@@ -262,85 +262,6 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
asScalaStream(javaStream.keyBy(keyExtractor1, keyExtractor2))
}
/**
* Partitions the two connected streams together. After this operation, all
* elements with the same partition key from both streams will be sent to the
* same parallel instance of the transformation functions.
*
* @param keyPosition1 The first stream's partition key field
* @param keyPosition2 The second stream's partition key field
* @return The co-partitioned connected streams
*/
def partitionByHash(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = {
asScalaStream(javaStream.partitionByHash(keyPosition1, keyPosition2))
}
/**
* Partitions the two connected streams together. After this operation, all
* elements with the same partition key from both streams will be sent to the
* same parallel instance of the transformation functions.
*
* @param keyPositions1 The first stream's partition key fields
* @param keyPositions2 The second stream's partition key fields
* @return The co-partitioned connected streams
*/
def partitionByHash(keyPositions1: Array[Int], keyPositions2: Array[Int]):
ConnectedStreams[IN1, IN2] = {
asScalaStream(javaStream.partitionByHash(keyPositions1, keyPositions2))
}
/**
* Partitions the two connected streams together. After this operation, all
* elements with the same partition key from both streams will be sent to the
* same parallel instance of the transformation functions.
*
* @param field1 The first stream's partition key expression
* @param field2 The second stream's partition key expression
* @return The co-partitioned connected streams
*/
def partitionByHash(field1: String, field2: String): ConnectedStreams[IN1, IN2] = {
asScalaStream(javaStream.partitionByHash(field1, field2))
}
/**
* Partitions the two connected streams together. After this operation, all
* elements with the same partition key from both streams will be sent to the
* same parallel instance of the transformation functions.
*
* @param fields1 The first stream's partition key field expressions
* @param fields2 The second stream's partition key field expressions
* @return The co-partitioned connected streams
*/
def partitionByHash(fields1: Array[String], fields2: Array[String]):
ConnectedStreams[IN1, IN2] = {
asScalaStream(javaStream.partitionByHash(fields1, fields2))
}
/**
* Partitions the two connected streams together. After this operation, all
* elements with the same partition key from both streams will be sent to the
* same parallel instance of the transformation functions.
*
* @param fun1 The first stream's partition key function
* @param fun2 The second stream's partition key function
* @return The co-partitioned connected streams
*/
def partitionByHash[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
ConnectedStreams[IN1, IN2] = {
val cleanFun1 = clean(fun1)
val cleanFun2 = clean(fun2)
val keyExtractor1 = new KeySelector[IN1, K] {
def getKey(in: IN1) = cleanFun1(in)
}
val keyExtractor2 = new KeySelector[IN2, L] {
def getKey(in: IN2) = cleanFun2(in)
}
asScalaStream(javaStream.partitionByHash(keyExtractor1, keyExtractor2))
}
/**
* Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning
* is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]]
......
......@@ -269,35 +269,6 @@ class DataStream[T](stream: JavaStream[T]) {
asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType))
}
/**
* Partitions the elements of a DataStream by the given key positions (for tuple/array types) to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def partitionByHash(fields: Int*): DataStream[T] =
asScalaStream(stream.partitionByHash(fields: _*))
/**
* Groups the elements of a DataStream by the given field expressions to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def partitionByHash(firstField: String, otherFields: String*): DataStream[T] =
asScalaStream(stream.partitionByHash(firstField +: otherFields.toArray: _*))
/**
* Groups the elements of a DataStream by the given K key to
* be used with grouped operators like grouped reduce or grouped aggregations.
*/
def partitionByHash[K: TypeInformation](fun: T => K): DataStream[T] = {
val cleanFun = clean(fun)
val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]]
}
asScalaStream(stream.partitionByHash(keyExtractor))
}
/**
* Partitions a tuple DataStream on the specified key fields using a custom partitioner.
* This method takes the key position to partition on, and a partitioner that accepts the key
......
......@@ -87,8 +87,8 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
}
/**
* Tests that {@link DataStream#keyBy} and {@link DataStream#partitionBy(KeySelector)} result in
* different and correct topologies. Does the some for the {@link ConnectedStreams}.
* Tests that [[DataStream.keyBy]] and [[DataStream.partitionCustom]] result in
* different and correct topologies. Does the some for the [[ConnectedStreams]].
*/
@Test
def testPartitioning(): Unit = {
......@@ -114,10 +114,10 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
assert(isPartitioned(env.getStreamGraph.getStreamEdges(src1.getId, gid4)))
//Testing DataStream partitioning
val partition1: DataStream[_] = src1.partitionByHash(0)
val partition2: DataStream[_] = src1.partitionByHash(1, 0)
val partition3: DataStream[_] = src1.partitionByHash("_1")
val partition4: DataStream[_] = src1.partitionByHash((x : (Long, Long)) => x._1)
val partition1: DataStream[_] = src1.keyBy(0)
val partition2: DataStream[_] = src1.keyBy(1, 0)
val partition3: DataStream[_] = src1.keyBy("_1")
val partition4: DataStream[_] = src1.keyBy((x : (Long, Long)) => x._1)
val pid1 = createDownStreamId(partition1)
val pid2 = createDownStreamId(partition2)
......@@ -181,22 +181,22 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
assert(isPartitioned(env.getStreamGraph.getStreamEdges(src2.getId, downStreamId5)))
//Testing ConnectedStreams partitioning
val connectedPartition1: ConnectedStreams[_, _] = connected.partitionByHash(0, 0)
val connectedPartition1: ConnectedStreams[_, _] = connected.keyBy(0, 0)
val connectDownStreamId1: Integer = createDownStreamId(connectedPartition1)
val connectedPartition2: ConnectedStreams[_, _] =
connected.partitionByHash(Array[Int](0), Array[Int](0))
connected.keyBy(Array[Int](0), Array[Int](0))
val connectDownStreamId2: Integer = createDownStreamId(connectedPartition2)
val connectedPartition3: ConnectedStreams[_, _] = connected.partitionByHash("_1", "_1")
val connectedPartition3: ConnectedStreams[_, _] = connected.keyBy("_1", "_1")
val connectDownStreamId3: Integer = createDownStreamId(connectedPartition3)
val connectedPartition4: ConnectedStreams[_, _] =
connected.partitionByHash(Array[String]("_1"), Array[String]("_1"))
connected.keyBy(Array[String]("_1"), Array[String]("_1"))
val connectDownStreamId4: Integer = createDownStreamId(connectedPartition4)
val connectedPartition5: ConnectedStreams[_, _] =
connected.partitionByHash(x => x._1, x => x._1)
connected.keyBy(x => x._1, x => x._1)
val connectDownStreamId5: Integer = createDownStreamId(connectedPartition5)
assert(
......@@ -487,17 +487,6 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
val iterated2 = source.iterate((input: DataStream[Int]) =>
(input.map(_ + 1), input.map(_.toString)), 2000)
try {
val invalid = source.iterate((input: ConnectedStreams[Int, String]) => {
val head = input.partitionByHash(1, 1).map(i => (i + 1).toString, s => s)
(head.filter(_ == "2"), head.filter(_ != "2"))
}, 1000).print()
fail()
} catch {
case uoe: UnsupportedOperationException =>
case e: Exception => fail()
}
val sg = env.getStreamGraph
assert(sg.getIterationSourceSinkPairs.size() == 2)
......
......@@ -93,7 +93,7 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase {
.map(new StatefulCounterFunction())
// -------------- third vertex - reducer and the sink ----------------
.partitionByHash("prefix")
.keyBy("prefix")
.flatMap(new OnceFailingAggregator(failurePos))
.addSink(new ValidatingSink());
}
......
......@@ -80,7 +80,7 @@ public class StreamingScalabilityAndLatency {
env
.addSource(new TimeStampingSource())
.map(new IdMapper<Tuple2<Long, Long>>())
.partitionByHash(0)
.keyBy(0)
.addSink(new TimestampingSink());
env.execute("Partitioning Program");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册