From 06a42bf6d64734621cc6aa92d5823d935c07e02c Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Tue, 16 Feb 2016 22:40:29 +0100 Subject: [PATCH] [FLINK-3419] [streaming] Drop 'partitionByHash' function, subsumed by 'keyBy()' --- docs/apis/streaming/index.md | 24 ------ .../api/datastream/ConnectedStreams.java | 83 ------------------- .../streaming/api/datastream/DataStream.java | 56 ------------- .../api/datastream/IterativeStream.java | 15 ---- .../flink/streaming/api/DataStreamTest.java | 50 +++++------ .../flink/streaming/api/PartitionerTest.java | 2 +- .../api/scala/ConnectedStreams.scala | 79 ------------------ .../streaming/api/scala/DataStream.scala | 29 ------- .../streaming/api/scala/DataStreamTest.scala | 33 +++----- .../StateCheckpointedITCase.java | 2 +- .../StreamingScalabilityAndLatency.java | 2 +- 11 files changed, 39 insertions(+), 336 deletions(-) diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md index 08843bc3bae..12e6db6c571 100644 --- a/docs/apis/streaming/index.md +++ b/docs/apis/streaming/index.md @@ -960,18 +960,6 @@ via the following functions. - - Hash partitioning
DataStream → DataStream - -

- Identical to keyBy but returns a DataStream instead of a KeyedStream. - {% highlight java %} -dataStream.partitionByHash("someKey"); -dataStream.partitionByHash(0); - {% endhighlight %} -

- - Custom partitioning
DataStream → DataStream @@ -1080,18 +1068,6 @@ dataStream.broadcast(); - - Hash partitioning
DataStream → DataStream - -

- Identical to keyBy but returns a DataStream instead of a KeyedStream. - {% highlight scala %} -dataStream.partitionByHash("someKey") -dataStream.partitionByHash(0) - {% endhighlight %} -

- - Custom partitioning
DataStream → DataStream diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java index 9da9e34f8e1..8a29995f9eb 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java @@ -187,89 +187,6 @@ public class ConnectedStreams { inputStream2.keyBy(keySelector2)); } - /** - * PartitionBy operation for connected data stream. Partitions the elements of - * input1 and input2 according to keyPosition1 and keyPosition2. - * - * @param keyPosition1 - * The field used to compute the hashcode of the elements in the - * first input stream. - * @param keyPosition2 - * The field used to compute the hashcode of the elements in the - * second input stream. - * @return The partitioned {@link ConnectedStreams} - */ - public ConnectedStreams partitionByHash(int keyPosition1, int keyPosition2) { - return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keyPosition1), - inputStream2.partitionByHash(keyPosition2)); - } - - /** - * PartitionBy operation for connected data stream. Partitions the elements of - * input1 and input2 according to keyPositions1 and keyPositions2. - * - * @param keyPositions1 - * The fields used to group the first input stream. - * @param keyPositions2 - * The fields used to group the second input stream. - * @return The partitioned {@link ConnectedStreams} - */ - public ConnectedStreams partitionByHash(int[] keyPositions1, int[] keyPositions2) { - return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keyPositions1), - inputStream2.partitionByHash(keyPositions2)); - } - - /** - * PartitionBy operation for connected data stream using key expressions. Partitions - * the elements of input1 and input2 according to field1 and field2. A - * field expression is either the name of a public field or a getter method - * with parentheses of the {@link DataStream}s underlying type. A dot can be - * used to drill down into objects, as in {@code "field1.getInnerField2()" } - * - * @param field1 - * The partitioning expressions for the first input - * @param field2 - * The partitioning expressions for the second input - * @return The partitioned {@link ConnectedStreams} - */ - public ConnectedStreams partitionByHash(String field1, String field2) { - return new ConnectedStreams<>(environment, inputStream1.partitionByHash(field1), - inputStream2.partitionByHash(field2)); - } - - /** - * PartitionBy operation for connected data stream using key expressions. Partitions - * the elements of input1 and input2 according to fields1 and fields2. A - * field expression is either the name of a public field or a getter method - * with parentheses of the {@link DataStream}s underlying type. A dot can be - * used to drill down into objects, as in {@code "field1.getInnerField2()" } - * - * @param fields1 - * The partitioning expressions for the first input - * @param fields2 - * The partitioning expressions for the second input - * @return The partitioned {@link ConnectedStreams} - */ - public ConnectedStreams partitionByHash(String[] fields1, String[] fields2) { - return new ConnectedStreams<>(environment, inputStream1.partitionByHash(fields1), - inputStream2.partitionByHash(fields2)); - } - - /** - * PartitionBy operation for connected data stream. Partitions the elements of - * input1 and input2 using keySelector1 and keySelector2. - * - * @param keySelector1 - * The {@link KeySelector} used for partitioning the first input - * @param keySelector2 - * The {@link KeySelector} used for partitioning the second input - * @return @return The partitioned {@link ConnectedStreams} - */ - public ConnectedStreams partitionByHash(KeySelector keySelector1, KeySelector keySelector2) { - return new ConnectedStreams<>(environment, inputStream1.partitionByHash(keySelector1), - inputStream2.partitionByHash(keySelector2)); - } - /** * Applies a CoMap transformation on a {@link ConnectedStreams} and maps * the output to a common type. The transformation calls a diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 07a91e9b09d..fa24e18bb50 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -85,7 +85,6 @@ import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; -import org.apache.flink.streaming.runtime.partitioner.HashPartitioner; import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; @@ -278,61 +277,6 @@ public class DataStream { getType(), getExecutionConfig()))); } - /** - * Sets the partitioning of the {@link DataStream} so that the output is - * partitioned hashing on the given fields. This setting only - * effects the how the outputs will be distributed between the parallel - * instances of the next processing operator. - * - * @param fields The tuple fields that should be used for partitioning - * @return The partitioned DataStream - * - */ - public DataStream partitionByHash(int... fields) { - if (getType() instanceof BasicArrayTypeInfo || getType() instanceof PrimitiveArrayTypeInfo) { - return partitionByHash(KeySelectorUtil.getSelectorForArray(fields, getType())); - } else { - return partitionByHash(new Keys.ExpressionKeys<>(fields, getType())); - } - } - - /** - * Sets the partitioning of the {@link DataStream} so that the output is - * partitioned hashing on the given fields. This setting only - * effects the how the outputs will be distributed between the parallel - * instances of the next processing operator. - * - * @param fields The tuple fields that should be used for partitioning - * @return The partitioned DataStream - * - */ - public DataStream partitionByHash(String... fields) { - return partitionByHash(new Keys.ExpressionKeys<>(fields, getType())); - } - - /** - * Sets the partitioning of the {@link DataStream} so that the output is - * partitioned using the given {@link KeySelector}. This setting only - * effects the how the outputs will be distributed between the parallel - * instances of the next processing operator. - * - * @param keySelector The function that extracts the key from an element in the Stream - * @return The partitioned DataStream - */ - public DataStream partitionByHash(KeySelector keySelector) { - return setConnectionType(new HashPartitioner<>(clean(keySelector))); - } - - //private helper method for partitioning - private DataStream partitionByHash(Keys keys) { - KeySelector keySelector = clean(KeySelectorUtil.getSelectorForKeys( - keys, - getType(), - getExecutionConfig())); - - return setConnectionType(new HashPartitioner<>(keySelector)); - } - /** * Partitions a tuple DataStream on the specified key fields using a custom partitioner. * This method takes the key position to partition on, and a partitioner that accepts the key type. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java index f6b54b7de5a..ab608e02b45 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java @@ -202,21 +202,6 @@ public class IterativeStream extends SingleOutputStreamOperator keyBy(KeySelector keySelector1,KeySelector keySelector2) {throw groupingException;} - - @Override - public ConnectedStreams partitionByHash(int keyPosition1, int keyPosition2) {throw groupingException;} - - @Override - public ConnectedStreams partitionByHash(int[] keyPositions1, int[] keyPositions2) {throw groupingException;} - - @Override - public ConnectedStreams partitionByHash(String field1, String field2) {throw groupingException;} - - @Override - public ConnectedStreams partitionByHash(String[] fields1, String[] fields2) {throw groupingException;} - - @Override - public ConnectedStreams partitionByHash(KeySelector keySelector1, KeySelector keySelector2) {throw groupingException;} } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java index 037afe4092a..502198c3a63 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java @@ -64,6 +64,7 @@ import org.junit.Test; import static org.junit.Assert.*; +@SuppressWarnings("serial") public class DataStreamTest extends StreamingMultipleProgramsTestBase { /** @@ -225,18 +226,17 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { } }).name("testMap"); - DataStreamSink connected = dataStream1.connect(dataStream2) + dataStream1.connect(dataStream2) .flatMap(new CoFlatMapFunction() { - private static final long serialVersionUID = 1L; @Override - public void flatMap1(Long value, Collector out) throws Exception { - } + public void flatMap1(Long value, Collector out) throws Exception {} @Override - public void flatMap2(Long value, Collector out) throws Exception { - } + public void flatMap2(Long value, Collector out) throws Exception {} + }).name("testCoFlatMap") + .windowAll(GlobalWindows.create()) .trigger(PurgingTrigger.of(CountTrigger.of(10))) .fold(0L, new FoldFunction() { @@ -262,7 +262,7 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { } /** - * Tests that {@link DataStream#keyBy} and {@link DataStream#partitionByHash} result in + * Tests that {@link DataStream#keyBy} and {@link DataStream#partitionCustom(Partitioner, int)} result in * different and correct topologies. Does the some for the {@link ConnectedStreams}. */ @Test @@ -296,10 +296,10 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { assertTrue(isKeyed(group4)); //Testing DataStream partitioning - DataStream> partition1 = src1.partitionByHash(0); - DataStream> partition2 = src1.partitionByHash(1, 0); - DataStream> partition3 = src1.partitionByHash("f0"); - DataStream> partition4 = src1.partitionByHash(new FirstSelector()); + DataStream> partition1 = src1.keyBy(0); + DataStream> partition2 = src1.keyBy(1, 0); + DataStream> partition3 = src1.keyBy("f0"); + DataStream> partition4 = src1.keyBy(new FirstSelector()); int pid1 = createDownStreamId(partition1); int pid2 = createDownStreamId(partition2); @@ -311,10 +311,10 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { assertTrue(isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), pid3))); assertTrue(isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), pid4))); - assertFalse(isKeyed(partition1)); - assertFalse(isKeyed(partition3)); - assertFalse(isKeyed(partition2)); - assertFalse(isKeyed(partition4)); + assertTrue(isKeyed(partition1)); + assertTrue(isKeyed(partition3)); + assertTrue(isKeyed(partition2)); + assertTrue(isKeyed(partition4)); // Testing DataStream custom partitioning Partitioner longPartitioner = new Partitioner() { @@ -378,19 +378,19 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { assertTrue(isKeyed(connectedGroup5)); //Testing ConnectedStreams partitioning - ConnectedStreams, Tuple2> connectedPartition1 = connected.partitionByHash(0, 0); + ConnectedStreams, Tuple2> connectedPartition1 = connected.keyBy(0, 0); Integer connectDownStreamId1 = createDownStreamId(connectedPartition1); - ConnectedStreams, Tuple2> connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0}); + ConnectedStreams, Tuple2> connectedPartition2 = connected.keyBy(new int[]{0}, new int[]{0}); Integer connectDownStreamId2 = createDownStreamId(connectedPartition2); - ConnectedStreams, Tuple2> connectedPartition3 = connected.partitionByHash("f0", "f0"); + ConnectedStreams, Tuple2> connectedPartition3 = connected.keyBy("f0", "f0"); Integer connectDownStreamId3 = createDownStreamId(connectedPartition3); - ConnectedStreams, Tuple2> connectedPartition4 = connected.partitionByHash(new String[]{"f0"}, new String[]{"f0"}); + ConnectedStreams, Tuple2> connectedPartition4 = connected.keyBy(new String[]{"f0"}, new String[]{"f0"}); Integer connectDownStreamId4 = createDownStreamId(connectedPartition4); - ConnectedStreams, Tuple2> connectedPartition5 = connected.partitionByHash(new FirstSelector(), new FirstSelector()); + ConnectedStreams, Tuple2> connectedPartition5 = connected.keyBy(new FirstSelector(), new FirstSelector()); Integer connectDownStreamId5 = createDownStreamId(connectedPartition5); assertTrue(isPartitioned(env.getStreamGraph().getStreamEdges(src1.getId(), @@ -418,11 +418,11 @@ public class DataStreamTest extends StreamingMultipleProgramsTestBase { assertTrue(isPartitioned(env.getStreamGraph().getStreamEdges(src2.getId(), connectDownStreamId5))); - assertFalse(isKeyed(connectedPartition1)); - assertFalse(isKeyed(connectedPartition2)); - assertFalse(isKeyed(connectedPartition3)); - assertFalse(isKeyed(connectedPartition4)); - assertFalse(isKeyed(connectedPartition5)); + assertTrue(isKeyed(connectedPartition1)); + assertTrue(isKeyed(connectedPartition2)); + assertTrue(isKeyed(connectedPartition3)); + assertTrue(isKeyed(connectedPartition4)); + assertTrue(isKeyed(connectedPartition5)); } /** diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java index a6c6936ba71..d408639885c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/PartitionerTest.java @@ -103,7 +103,7 @@ public class PartitionerTest extends StreamingMultipleProgramsTestBase { // partition by hash src - .partitionByHash(0) + .keyBy(0) .map(new SubtaskIndexAssigner()) .addSink(hashPartitionResultSink); diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala index fbea12a9556..a80937cd7b2 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala @@ -262,85 +262,6 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { asScalaStream(javaStream.keyBy(keyExtractor1, keyExtractor2)) } - /** - * Partitions the two connected streams together. After this operation, all - * elements with the same partition key from both streams will be sent to the - * same parallel instance of the transformation functions. - * - * @param keyPosition1 The first stream's partition key field - * @param keyPosition2 The second stream's partition key field - * @return The co-partitioned connected streams - */ - def partitionByHash(keyPosition1: Int, keyPosition2: Int): ConnectedStreams[IN1, IN2] = { - asScalaStream(javaStream.partitionByHash(keyPosition1, keyPosition2)) - } - - /** - * Partitions the two connected streams together. After this operation, all - * elements with the same partition key from both streams will be sent to the - * same parallel instance of the transformation functions. - * - * @param keyPositions1 The first stream's partition key fields - * @param keyPositions2 The second stream's partition key fields - * @return The co-partitioned connected streams - */ - def partitionByHash(keyPositions1: Array[Int], keyPositions2: Array[Int]): - ConnectedStreams[IN1, IN2] = { - asScalaStream(javaStream.partitionByHash(keyPositions1, keyPositions2)) - } - - /** - * Partitions the two connected streams together. After this operation, all - * elements with the same partition key from both streams will be sent to the - * same parallel instance of the transformation functions. - * - * @param field1 The first stream's partition key expression - * @param field2 The second stream's partition key expression - * @return The co-partitioned connected streams - */ - def partitionByHash(field1: String, field2: String): ConnectedStreams[IN1, IN2] = { - asScalaStream(javaStream.partitionByHash(field1, field2)) - } - - /** - * Partitions the two connected streams together. After this operation, all - * elements with the same partition key from both streams will be sent to the - * same parallel instance of the transformation functions. - * - * @param fields1 The first stream's partition key field expressions - * @param fields2 The second stream's partition key field expressions - * @return The co-partitioned connected streams - */ - def partitionByHash(fields1: Array[String], fields2: Array[String]): - ConnectedStreams[IN1, IN2] = { - asScalaStream(javaStream.partitionByHash(fields1, fields2)) - } - - /** - * Partitions the two connected streams together. After this operation, all - * elements with the same partition key from both streams will be sent to the - * same parallel instance of the transformation functions. - * - * @param fun1 The first stream's partition key function - * @param fun2 The second stream's partition key function - * @return The co-partitioned connected streams - */ - def partitionByHash[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L): - ConnectedStreams[IN1, IN2] = { - - val cleanFun1 = clean(fun1) - val cleanFun2 = clean(fun2) - - val keyExtractor1 = new KeySelector[IN1, K] { - def getKey(in: IN1) = cleanFun1(in) - } - val keyExtractor2 = new KeySelector[IN2, L] { - def getKey(in: IN2) = cleanFun2(in) - } - - asScalaStream(javaStream.partitionByHash(keyExtractor1, keyExtractor2)) - } - /** * Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning * is not disabled in the [[org.apache.flink.api.common.ExecutionConfig]] diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index fd4bbf32e4f..c35541c3d47 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -269,35 +269,6 @@ class DataStream[T](stream: JavaStream[T]) { asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType)) } - /** - * Partitions the elements of a DataStream by the given key positions (for tuple/array types) to - * be used with grouped operators like grouped reduce or grouped aggregations. - */ - def partitionByHash(fields: Int*): DataStream[T] = - asScalaStream(stream.partitionByHash(fields: _*)) - - /** - * Groups the elements of a DataStream by the given field expressions to - * be used with grouped operators like grouped reduce or grouped aggregations. - */ - def partitionByHash(firstField: String, otherFields: String*): DataStream[T] = - asScalaStream(stream.partitionByHash(firstField +: otherFields.toArray: _*)) - - /** - * Groups the elements of a DataStream by the given K key to - * be used with grouped operators like grouped reduce or grouped aggregations. - */ - def partitionByHash[K: TypeInformation](fun: T => K): DataStream[T] = { - - val cleanFun = clean(fun) - val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] { - def getKey(in: T) = cleanFun(in) - override def getProducedType: TypeInformation[K] = implicitly[TypeInformation[K]] - } - - asScalaStream(stream.partitionByHash(keyExtractor)) - } - /** * Partitions a tuple DataStream on the specified key fields using a custom partitioner. * This method takes the key position to partition on, and a partitioner that accepts the key diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala index 0b4eb863192..1884185e897 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala @@ -87,8 +87,8 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { } /** - * Tests that {@link DataStream#keyBy} and {@link DataStream#partitionBy(KeySelector)} result in - * different and correct topologies. Does the some for the {@link ConnectedStreams}. + * Tests that [[DataStream.keyBy]] and [[DataStream.partitionCustom]] result in + * different and correct topologies. Does the some for the [[ConnectedStreams]]. */ @Test def testPartitioning(): Unit = { @@ -114,10 +114,10 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { assert(isPartitioned(env.getStreamGraph.getStreamEdges(src1.getId, gid4))) //Testing DataStream partitioning - val partition1: DataStream[_] = src1.partitionByHash(0) - val partition2: DataStream[_] = src1.partitionByHash(1, 0) - val partition3: DataStream[_] = src1.partitionByHash("_1") - val partition4: DataStream[_] = src1.partitionByHash((x : (Long, Long)) => x._1) + val partition1: DataStream[_] = src1.keyBy(0) + val partition2: DataStream[_] = src1.keyBy(1, 0) + val partition3: DataStream[_] = src1.keyBy("_1") + val partition4: DataStream[_] = src1.keyBy((x : (Long, Long)) => x._1) val pid1 = createDownStreamId(partition1) val pid2 = createDownStreamId(partition2) @@ -181,22 +181,22 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { assert(isPartitioned(env.getStreamGraph.getStreamEdges(src2.getId, downStreamId5))) //Testing ConnectedStreams partitioning - val connectedPartition1: ConnectedStreams[_, _] = connected.partitionByHash(0, 0) + val connectedPartition1: ConnectedStreams[_, _] = connected.keyBy(0, 0) val connectDownStreamId1: Integer = createDownStreamId(connectedPartition1) val connectedPartition2: ConnectedStreams[_, _] = - connected.partitionByHash(Array[Int](0), Array[Int](0)) + connected.keyBy(Array[Int](0), Array[Int](0)) val connectDownStreamId2: Integer = createDownStreamId(connectedPartition2) - val connectedPartition3: ConnectedStreams[_, _] = connected.partitionByHash("_1", "_1") + val connectedPartition3: ConnectedStreams[_, _] = connected.keyBy("_1", "_1") val connectDownStreamId3: Integer = createDownStreamId(connectedPartition3) val connectedPartition4: ConnectedStreams[_, _] = - connected.partitionByHash(Array[String]("_1"), Array[String]("_1")) + connected.keyBy(Array[String]("_1"), Array[String]("_1")) val connectDownStreamId4: Integer = createDownStreamId(connectedPartition4) val connectedPartition5: ConnectedStreams[_, _] = - connected.partitionByHash(x => x._1, x => x._1) + connected.keyBy(x => x._1, x => x._1) val connectDownStreamId5: Integer = createDownStreamId(connectedPartition5) assert( @@ -487,17 +487,6 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase { val iterated2 = source.iterate((input: DataStream[Int]) => (input.map(_ + 1), input.map(_.toString)), 2000) - try { - val invalid = source.iterate((input: ConnectedStreams[Int, String]) => { - val head = input.partitionByHash(1, 1).map(i => (i + 1).toString, s => s) - (head.filter(_ == "2"), head.filter(_ != "2")) - }, 1000).print() - fail() - } catch { - case uoe: UnsupportedOperationException => - case e: Exception => fail() - } - val sg = env.getStreamGraph assert(sg.getIterationSourceSinkPairs.size() == 2) diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java index 962fe8403d4..9d37b591040 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateCheckpointedITCase.java @@ -93,7 +93,7 @@ public class StateCheckpointedITCase extends StreamFaultToleranceTestBase { .map(new StatefulCounterFunction()) // -------------- third vertex - reducer and the sink ---------------- - .partitionByHash("prefix") + .keyBy("prefix") .flatMap(new OnceFailingAggregator(failurePos)) .addSink(new ValidatingSink()); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java index 7bdfc9d6730..ec617b1905f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java @@ -80,7 +80,7 @@ public class StreamingScalabilityAndLatency { env .addSource(new TimeStampingSource()) .map(new IdMapper>()) - .partitionByHash(0) + .keyBy(0) .addSink(new TimestampingSink()); env.execute("Partitioning Program"); -- GitLab