diff --git a/docs/apis/streaming/fig/rescale.svg b/docs/apis/streaming/fig/rescale.svg new file mode 100644 index 0000000000000000000000000000000000000000..43eeae9c6ac038ba2fad251dd74a30f3da3b3c0d --- /dev/null +++ b/docs/apis/streaming/fig/rescale.svg @@ -0,0 +1,472 @@ + + + + + + + + + + + + + + + + + + + + + image/svg+xml + + + + + + + + + + + Src + + + + + + Snk + + + + + Map + + + + Map + + + + Map + + + + + + + + + + + + + + + Src + + + + + + Snk + + + + + Map + + + + Map + + + + Map + + + + + + + + + + + + diff --git a/docs/apis/streaming/index.md b/docs/apis/streaming/index.md index 707899af9553b0ef866ac028b63d5eadda798764..31e6df986c6e10853d73f807d136d2b51365d78a 100644 --- a/docs/apis/streaming/index.md +++ b/docs/apis/streaming/index.md @@ -1270,8 +1270,8 @@ dataStream.partitionByHash(0);

Uses a user-defined Partitioner to select the target task for each element. {% highlight java %} -dataStream.partitionCustom(new Partitioner(){...}, "someKey"); -dataStream.partitionCustom(new Partitioner(){...}, 0); +dataStream.partitionCustom(partitioner, "someKey"); +dataStream.partitionCustom(partitioner, 0); {% endhighlight %}

@@ -1282,7 +1282,7 @@ dataStream.partitionCustom(new Partitioner(){...}, 0);

Partitions elements randomly according to a uniform distribution. {% highlight java %} -dataStream.partitionRandom(); +dataStream.shuffle(); {% endhighlight %}

@@ -1299,6 +1299,51 @@ dataStream.rebalance();

+ + Rescaling
DataStream → DataStream + +

+ Partitions elements, round-robin, to a subset of downstream operations. This is + useful if you want to have pipelines where you, for example, fan out from + each parallel instance of a source to a subset of several mappers to distribute load + but don't want the full rebalance that rebalance() would incur. This would require only + local data transfers instead of transferring data over network, depending on + other configuration values such as the number of slots of TaskManagers. +

+

+ The subset of downstream operations to which the upstream operation sends + elements depends on the degree of parallelism of both the upstream and downstream operation. + For example, if the upstream operation has parallelism 2 and the downstream operation + has parallelism 4, then one upstream operation would distribute elements to two + downstream operations while the other upstream operation would distribute to the other + two downstream operations. If, on the other hand, the downstream operation has parallelism + 2 while the upstream operation has parallelism 4 then two upstream operations would + distribute to one downstream operation while the other two upstream operations would + distribute to the other downstream operations. +

+

+ In cases where the different parallelisms are not multiples of each other one or several + downstream operations will have a differing number of inputs from upstream operations. + +

+

+ Please see this figure for a visualization of the connection pattern in the above + example: +

+ +
+ Checkpoint barriers in data streams +
+ + +

+ {% highlight java %} +dataStream.rescale(); + {% endhighlight %} + +

+ + Broadcasting
DataStream → DataStream @@ -1357,7 +1402,7 @@ dataStream.partitionCustom(partitioner, 0)

Partitions elements randomly according to a uniform distribution. {% highlight scala %} -dataStream.partitionRandom() +dataStream.shuffle() {% endhighlight %}

@@ -1374,6 +1419,51 @@ dataStream.rebalance()

+ + Rescaling
DataStream → DataStream + +

+ Partitions elements, round-robin, to a subset of downstream operations. This is + useful if you want to have pipelines where you, for example, fan out from + each parallel instance of a source to a subset of several mappers to distribute load + but don't want the full rebalance that rebalance() would incur. This would require only + local data transfers instead of transferring data over network, depending on + other configuration values such as the number of slots of TaskManagers. +

+

+ The subset of downstream operations to which the upstream operation sends + elements depends on the degree of parallelism of both the upstream and downstream operation. + For example, if the upstream operation has parallelism 2 and the downstream operation + has parallelism 4, then one upstream operation would distribute elements to two + downstream operations while the other upstream operation would distribute to the other + two downstream operations. If, on the other hand, the downstream operation has parallelism + 2 while the upstream operation has parallelism 4 then two upstream operations would + distribute to one downstream operation while the other two upstream operations would + distribute to the other downstream operations. +

+

+ In cases where the different parallelisms are not multiples of each other one or several + downstream operations will have a differing number of inputs from upstream operations. + +

+

+ Please see this figure for a visualization of the connection pattern in the above + example: +

+ +
+ Checkpoint barriers in data streams +
+ + +

+ {% highlight java %} +dataStream.rescale() + {% endhighlight %} + +

+ + Broadcasting
DataStream → DataStream diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 6d2a44adaaff5f1964a00a0626e51b8947dd3ed3..891562c25e54780d1ea784574ed3a8891275905f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -77,6 +77,7 @@ import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.HashPartitioner; import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; @@ -390,12 +391,8 @@ public class DataStream { } /** - * Sets the partitioning of the {@link DataStream} so that the output tuples - * are broadcasted to every parallel instance of the next component. - * - *

- * This setting only effects the how the outputs will be distributed between - * the parallel instances of the next processing operator. + * Sets the partitioning of the {@link DataStream} so that the output elements + * are broadcasted to every parallel instance of the next operation. * * @return The DataStream with broadcast partitioning set. */ @@ -404,12 +401,8 @@ public class DataStream { } /** - * Sets the partitioning of the {@link DataStream} so that the output tuples - * are shuffled uniformly randomly to the next component. - * - *

- * This setting only effects the how the outputs will be distributed between - * the parallel instances of the next processing operator. + * Sets the partitioning of the {@link DataStream} so that the output elements + * are shuffled uniformly randomly to the next operation. * * @return The DataStream with shuffle partitioning set. */ @@ -419,13 +412,8 @@ public class DataStream { } /** - * Sets the partitioning of the {@link DataStream} so that the output tuples - * are forwarded to the local subtask of the next component (whenever - * possible). - * - *

- * This setting only effects the how the outputs will be distributed between - * the parallel instances of the next processing operator. + * Sets the partitioning of the {@link DataStream} so that the output elements + * are forwarded to the local subtask of the next operation. * * @return The DataStream with forward partitioning set. */ @@ -435,20 +423,41 @@ public class DataStream { } /** - * Sets the partitioning of the {@link DataStream} so that the output tuples - * are distributed evenly to instances of the next component in a Round-robin + * Sets the partitioning of the {@link DataStream} so that the output elements + * are distributed evenly to instances of the next operation in a round-robin * fashion. * - *

- * This setting only effects the how the outputs will be distributed between - * the parallel instances of the next processing operator. - * * @return The DataStream with rebalance partitioning set. */ public DataStream rebalance() { return setConnectionType(new RebalancePartitioner()); } + /** + * Sets the partitioning of the {@link DataStream} so that the output elements + * are distributed evenly to a subset of instances of the next operation in a round-robin + * fashion. + * + *

The subset of downstream operations to which the upstream operation sends + * elements depends on the degree of parallelism of both the upstream and downstream operation. + * For example, if the upstream operation has parallelism 2 and the downstream operation + * has parallelism 4, then one upstream operation would distribute elements to two + * downstream operations while the other upstream operation would distribute to the other + * two downstream operations. If, on the other hand, the downstream operation has parallelism + * 2 while the upstream operation has parallelism 4 then two upstream operations will + * distribute to one downstream operation while the other two upstream operations will + * distribute to the other downstream operations. + * + *

In cases where the different parallelisms are not multiples of each other one or several + * downstream operations will have a differing number of inputs from upstream operations. + * + * @return The DataStream with rescale partitioning set. + */ + @Experimental + public DataStream rescale() { + return setConnectionType(new RescalePartitioner()); + } + /** * Sets the partitioning of the {@link DataStream} so that the output values * all go to the first instance of the next processing operator. Use this diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java index c2fcaaf82d70204110af51cf9be066d385459c06..0aef7f8e0bbfa268ee801a772e10e6d8adfe8843 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java @@ -114,28 +114,40 @@ public class SingleOutputStreamOperator broadcast() { return (SingleOutputStreamOperator) super.broadcast(); } @SuppressWarnings("unchecked") + @Override @Experimental public SingleOutputStreamOperator shuffle() { return (SingleOutputStreamOperator) super.shuffle(); } @SuppressWarnings("unchecked") + @Override @Experimental public SingleOutputStreamOperator forward() { return (SingleOutputStreamOperator) super.forward(); } @SuppressWarnings("unchecked") + @Override public SingleOutputStreamOperator rebalance() { return (SingleOutputStreamOperator) super.rebalance(); } @SuppressWarnings("unchecked") + @Override + @Experimental + public SingleOutputStreamOperator rescale() { + return (SingleOutputStreamOperator) super.rescale(); + } + + @SuppressWarnings("unchecked") + @Override @Experimental public SingleOutputStreamOperator global() { return (SingleOutputStreamOperator) super.global(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 5da2caaf2f03b0c7ddd9e68e95fe8fb3a034eef0..fd75ba727878485bdba82ef97cf4c9b389114573 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.StreamOperator; import org.apache.flink.streaming.api.transformations.StreamTransformation; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; +import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.tasks.StreamIterationHead; import org.apache.flink.streaming.runtime.tasks.StreamIterationTail; @@ -361,10 +362,16 @@ public class StreamingJobGraphGenerator { StreamPartitioner partitioner = edge.getPartitioner(); if (partitioner instanceof ForwardPartitioner) { downStreamVertex.connectNewDataSetAsInput( - headVertex, - DistributionPattern.POINTWISE, - ResultPartitionType.PIPELINED, - true); + headVertex, + DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED, + true); + } else if (partitioner instanceof RescalePartitioner){ + downStreamVertex.connectNewDataSetAsInput( + headVertex, + DistributionPattern.POINTWISE, + ResultPartitionType.PIPELINED, + true); } else { downStreamVertex.connectNewDataSetAsInput( headVertex, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java new file mode 100644 index 0000000000000000000000000000000000000000..063e64a53a81dffc069779e20b4b799c811b83e3 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitioner.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.partitioner; + +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +/** + * Partitioner that distributes the data equally by cycling through the output + * channels. This distributes only to a subset of downstream nodes because + * {@link org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator} instantiates + * a {@link DistributionPattern#POINTWISE} distribution pattern when encountering + * {@code SemiRebalancePartitioner}. + * + *

The subset of downstream operations to which the upstream operation sends + * elements depends on the degree of parallelism of both the upstream and downstream operation. + * For example, if the upstream operation has parallelism 2 and the downstream operation + * has parallelism 4, then one upstream operation would distribute elements to two + * downstream operations while the other upstream operation would distribute to the other + * two downstream operations. If, on the other hand, the downstream operation has parallelism + * 2 while the upstream operation has parallelism 4 then two upstream operations will + * distribute to one downstream operation while the other two upstream operations will + * distribute to the other downstream operations. + * + *

In cases where the different parallelisms are not multiples of each other one or several + * downstream operations will have a differing number of inputs from upstream operations. + * + * @param Type of the elements in the Stream being rescaled + */ +public class RescalePartitioner extends StreamPartitioner { + private static final long serialVersionUID = 1L; + + private int[] returnArray = new int[] {-1}; + + @Override + public int[] selectChannels(SerializationDelegate> record, int numberOfOutputChannels) { + this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels; + return this.returnArray; + } + + public StreamPartitioner copy() { + return this; + } + + @Override + public String toString() { + return "RESCALE"; + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..bac7fa5edfb8b99fd9e61b78a8c06f2a72f88c0c --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/RescalePartitionerTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.partitioner; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.executiongraph.ExecutionEdge; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.Collector; +import org.apache.flink.util.TestLogger; +import org.junit.Before; +import org.junit.Test; + +import java.net.URL; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.*; + +public class RescalePartitionerTest extends TestLogger { + + private RescalePartitioner distributePartitioner; + private StreamRecord streamRecord = new StreamRecord(null); + private SerializationDelegate> sd = new SerializationDelegate>( + null); + + @Before + public void setPartitioner() { + distributePartitioner = new RescalePartitioner(); + } + + @Test + public void testSelectChannelsLength() { + sd.setInstance(streamRecord); + assertEquals(1, distributePartitioner.selectChannels(sd, 1).length); + assertEquals(1, distributePartitioner.selectChannels(sd, 2).length); + assertEquals(1, distributePartitioner.selectChannels(sd, 1024).length); + } + + @Test + public void testSelectChannelsInterval() { + sd.setInstance(streamRecord); + assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]); + assertEquals(1, distributePartitioner.selectChannels(sd, 3)[0]); + assertEquals(2, distributePartitioner.selectChannels(sd, 3)[0]); + assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]); + } + + @Test + public void testExecutionGraphGeneration() { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.setParallelism(4); + + // get input data + DataStream text = env.addSource(new ParallelSourceFunction() { + @Override + public void run(SourceContext ctx) throws Exception { + + } + + @Override + public void cancel() { + + } + }).setParallelism(2); + + DataStream> counts = text + .rescale() + .flatMap(new FlatMapFunction>() { + @Override + public void flatMap(String value, + Collector> out) throws Exception { + + } + }); + + counts.rescale().print().setParallelism(2); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + + final JobID jobId = new JobID(); + final String jobName = "Semi-Rebalance Test Job"; + final Configuration cfg = new Configuration(); + + List jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources(); + + JobVertex sourceVertex = jobVertices.get(0); + JobVertex mapVertex = jobVertices.get(1); + JobVertex sinkVertex = jobVertices.get(2); + + assertEquals(2, sourceVertex.getParallelism()); + assertEquals(4, mapVertex.getParallelism()); + assertEquals(2, sinkVertex.getParallelism()); + + ExecutionGraph eg = new ExecutionGraph( + TestingUtils.defaultExecutionContext(), + jobId, + jobName, + cfg, + AkkaUtils.getDefaultTimeout(),new ArrayList(), new ArrayList(), ExecutionGraph.class.getClassLoader()); + try { + eg.attachJobGraph(jobVertices); + } + catch (JobException e) { + e.printStackTrace(); + fail("Building ExecutionGraph failed: " + e.getMessage()); + } + + + ExecutionJobVertex execSourceVertex = eg.getJobVertex(sourceVertex.getID()); + ExecutionJobVertex execMapVertex= eg.getJobVertex(mapVertex.getID()); + ExecutionJobVertex execSinkVertex= eg.getJobVertex(sinkVertex.getID()); + + assertEquals(0, execSourceVertex.getInputs().size()); + + assertEquals(1, execMapVertex.getInputs().size()); + assertEquals(4, execMapVertex.getParallelism()); + ExecutionVertex[] mapTaskVertices = execMapVertex.getTaskVertices(); + + // verify that we have each parallel input partition exactly twice, i.e. that one source + // sends to two unique mappers + Map mapInputPartitionCounts = new HashMap<>(); + for (ExecutionVertex mapTaskVertex: mapTaskVertices) { + assertEquals(1, mapTaskVertex.getNumberOfInputs()); + assertEquals(1, mapTaskVertex.getInputEdges(0).length); + ExecutionEdge inputEdge = mapTaskVertex.getInputEdges(0)[0]; + assertEquals(sourceVertex.getID(), inputEdge.getSource().getProducer().getJobvertexId()); + int inputPartition = inputEdge.getSource().getPartitionNumber(); + if (!mapInputPartitionCounts.containsKey(inputPartition)) { + mapInputPartitionCounts.put(inputPartition, 1); + } else { + mapInputPartitionCounts.put(inputPartition, mapInputPartitionCounts.get(inputPartition) + 1); + } + } + + assertEquals(2, mapInputPartitionCounts.size()); + for (int count: mapInputPartitionCounts.values()) { + assertEquals(2, count); + } + + assertEquals(1, execSinkVertex.getInputs().size()); + assertEquals(2, execSinkVertex.getParallelism()); + ExecutionVertex[] sinkTaskVertices = execSinkVertex.getTaskVertices(); + + // verify each sink instance has two inputs from the map and that each map subpartition + // only occurs in one unique input edge + Set mapSubpartitions = new HashSet<>(); + for (ExecutionVertex sinkTaskVertex: sinkTaskVertices) { + assertEquals(1, sinkTaskVertex.getNumberOfInputs()); + assertEquals(2, sinkTaskVertex.getInputEdges(0).length); + ExecutionEdge inputEdge1 = sinkTaskVertex.getInputEdges(0)[0]; + ExecutionEdge inputEdge2 = sinkTaskVertex.getInputEdges(0)[1]; + assertEquals(mapVertex.getID(), inputEdge1.getSource().getProducer().getJobvertexId()); + assertEquals(mapVertex.getID(), inputEdge2.getSource().getProducer().getJobvertexId()); + + int inputPartition1 = inputEdge1.getSource().getPartitionNumber(); + assertFalse(mapSubpartitions.contains(inputPartition1)); + mapSubpartitions.add(inputPartition1); + int inputPartition2 = inputEdge2.getSource().getPartitionNumber(); + assertFalse(mapSubpartitions.contains(inputPartition2)); + mapSubpartitions.add(inputPartition2); + } + + assertEquals(4, mapSubpartitions.size()); + } +} diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index d3bc463495245446d3c96ec00ec9f31acfc83b33..3fe55c41eae11d769eb14ea6e12edac15918562d 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -300,7 +300,7 @@ class DataStream[T](stream: JavaStream[T]) { * Partitions a tuple DataStream on the specified key fields using a custom partitioner. * This method takes the key position to partition on, and a partitioner that accepts the key * type. - *

+ * * Note: This method works only on single field keys. */ def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] = @@ -310,7 +310,7 @@ class DataStream[T](stream: JavaStream[T]) { * Partitions a POJO DataStream on the specified key fields using a custom partitioner. * This method takes the key expression to partition on, and a partitioner that accepts the key * type. - *

+ * * Note: This method works only on single field keys. */ def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String) @@ -320,7 +320,7 @@ class DataStream[T](stream: JavaStream[T]) { * Partitions a DataStream on the key returned by the selector, using a custom partitioner. * This method takes the key selector to get the key to partition on, and a partitioner that * accepts the key type. - *

+ * * Note: This method works only on single field keys, i.e. the selector cannot return tuples * of fields. */ @@ -336,10 +336,7 @@ class DataStream[T](stream: JavaStream[T]) { /** * Sets the partitioning of the DataStream so that the output tuples - * are broad casted to every parallel instance of the next component. This - * setting only effects the how the outputs will be distributed between the - * parallel instances of the next processing operator. - * + * are broad casted to every parallel instance of the next component. */ def broadcast: DataStream[T] = stream.broadcast() @@ -353,10 +350,7 @@ class DataStream[T](stream: JavaStream[T]) { /** * Sets the partitioning of the DataStream so that the output tuples - * are shuffled to the next component. This setting only effects the how the - * outputs will be distributed between the parallel instances of the next - * processing operator. - * + * are shuffled to the next component. */ @Experimental def shuffle: DataStream[T] = stream.shuffle() @@ -364,39 +358,53 @@ class DataStream[T](stream: JavaStream[T]) { /** * Sets the partitioning of the DataStream so that the output tuples * are forwarded to the local subtask of the next component (whenever - * possible). This is the default partitioner setting. This setting only - * effects the how the outputs will be distributed between the parallel - * instances of the next processing operator. - * + * possible). */ @Experimental def forward: DataStream[T] = stream.forward() /** * Sets the partitioning of the DataStream so that the output tuples - * are distributed evenly to the next component.This setting only effects - * the how the outputs will be distributed between the parallel instances of - * the next processing operator. - * + * are distributed evenly to the next component. */ def rebalance: DataStream[T] = stream.rebalance() + /** + * Sets the partitioning of the [[DataStream]] so that the output tuples + * are distributed evenly to a subset of instances of the downstream operation. + * + * The subset of downstream operations to which the upstream operation sends + * elements depends on the degree of parallelism of both the upstream and downstream operation. + * For example, if the upstream operation has parallelism 2 and the downstream operation + * has parallelism 4, then one upstream operation would distribute elements to two + * downstream operations while the other upstream operation would distribute to the other + * two downstream operations. If, on the other hand, the downstream operation has parallelism + * 2 while the upstream operation has parallelism 4 then two upstream operations will + * distribute to one downstream operation while the other two upstream operations will + * distribute to the other downstream operations. + * + * In cases where the different parallelisms are not multiples of each other one or several + * downstream operations will have a differing number of inputs from upstream operations. + */ + @Experimental + def rescale: DataStream[T] = stream.rescale() + /** * Initiates an iterative part of the program that creates a loop by feeding * back data streams. To create a streaming iteration the user needs to define * a transformation that creates two DataStreams. The first one is the output * that will be fed back to the start of the iteration and the second is the output * stream of the iterative part. - *

+ * * stepfunction: initialStream => (feedback, output) - *

+ * * A common pattern is to use output splitting to create feedback and output DataStream. * Please refer to the .split(...) method of the DataStream - *

+ * * By default a DataStream with iteration will never terminate, but the user * can use the maxWaitTime parameter to set a max waiting time for the iteration head. * If no data received in the set time the stream terminates. - *

+ * * By default the feedback partitioning is set to match the input, to override this set * the keepPartitioning flag to true * @@ -424,9 +432,8 @@ class DataStream[T](stream: JavaStream[T]) { * * This allows the user to distinguish standard input from feedback inputs. * - *

* stepfunction: initialStream => (feedback, output) - *

+ * * The user must set the max waiting time for the iteration head. * If no data received in the set time the stream terminates. If this parameter is set * to 0 then the iteration sources will indefinitely, so the job must be killed to stop.