提交 9f6a8b6d 编写于 作者: A Aljoscha Krettek

[FLINK-3336] Add Rescale Data Shipping for DataStream

This is the Javadoc of DataStream.rescale() that describes the
behaviour:

Sets the partitioning of the {@link DataStream} so that the output elements
are distributed evenly to a subset of instances of the next operation in a round-robin
fashion.

The subset of downstream operations to which the upstream operation sends
elements depends on the degree of parallelism of both the upstream and downstream operation.
For example, if the upstream operation has parallelism 2 and the downstream operation
has parallelism 4, then one upstream operation would distribute elements to two
downstream operations while the other upstream operation would distribute to the other
two downstream operations. If, on the other hand, the downstream operation has parallelism
2 while the upstream operation has parallelism 4 then two upstream operations will
distribute to one downstream operation while the other two upstream operations will
distribute to the other downstream operations.

In cases where the different parallelisms are not multiples of each other one or several
downstream operations will have a differing number of inputs from upstream operations.
上级 7469c17c
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<svg
xmlns:dc="http://purl.org/dc/elements/1.1/"
xmlns:cc="http://creativecommons.org/ns#"
xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
xmlns:svg="http://www.w3.org/2000/svg"
xmlns="http://www.w3.org/2000/svg"
xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
width="372.04724"
height="262.20471"
id="svg2"
version="1.1"
inkscape:version="0.48.5 r10040"
sodipodi:docname="New document 1">
<defs
id="defs4">
<marker
inkscape:stockid="TriangleOutM"
orient="auto"
refY="0.0"
refX="0.0"
id="TriangleOutM"
style="overflow:visible">
<path
id="path5012"
d="M 5.77,0.0 L -2.88,5.0 L -2.88,-5.0 L 5.77,0.0 z "
style="fill-rule:evenodd;stroke:#000000;stroke-width:1.0pt"
transform="scale(0.4)" />
</marker>
<marker
inkscape:stockid="TriangleOutL"
orient="auto"
refY="0.0"
refX="0.0"
id="TriangleOutL"
style="overflow:visible">
<path
id="path5009"
d="M 5.77,0.0 L -2.88,5.0 L -2.88,-5.0 L 5.77,0.0 z "
style="fill-rule:evenodd;stroke:#000000;stroke-width:1.0pt"
transform="scale(0.8)" />
</marker>
<marker
inkscape:stockid="Arrow2Lend"
orient="auto"
refY="0.0"
refX="0.0"
id="Arrow2Lend"
style="overflow:visible;">
<path
id="path4888"
style="fill-rule:evenodd;stroke-width:0.62500000;stroke-linejoin:round;"
d="M 8.7185878,4.0337352 L -2.2072895,0.016013256 L 8.7185884,-4.0017078 C 6.9730900,-1.6296469 6.9831476,1.6157441 8.7185878,4.0337352 z "
transform="scale(1.1) rotate(180) translate(1,0)" />
</marker>
</defs>
<sodipodi:namedview
id="base"
pagecolor="#ffffff"
bordercolor="#666666"
borderopacity="1.0"
inkscape:pageopacity="0.0"
inkscape:pageshadow="2"
inkscape:zoom="2.8284271"
inkscape:cx="23.967455"
inkscape:cy="142.96627"
inkscape:document-units="px"
inkscape:current-layer="layer1"
showgrid="false"
inkscape:window-width="2560"
inkscape:window-height="1391"
inkscape:window-x="0"
inkscape:window-y="1"
inkscape:window-maximized="1" />
<metadata
id="metadata7">
<rdf:RDF>
<cc:Work
rdf:about="">
<dc:format>image/svg+xml</dc:format>
<dc:type
rdf:resource="http://purl.org/dc/dcmitype/StillImage" />
<dc:title></dc:title>
</cc:Work>
</rdf:RDF>
</metadata>
<g
inkscape:label="Layer 1"
inkscape:groupmode="layer"
id="layer1"
transform="translate(0,-790.15744)">
<g
id="g6997"
transform="translate(0,0.17677669)">
<g
transform="translate(21.79899,7.2928933)"
id="g4835">
<g
id="g4721"
transform="translate(-2.6011424,-1.5258789e-5)">
<path
transform="matrix(1.1448338,0,0,1.1448338,-9.9783931,783.57046)"
d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 -14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 14.520942,6.501247 14.520942,14.520942 z"
sodipodi:ry="14.520943"
sodipodi:rx="14.520943"
sodipodi:cy="60.300472"
sodipodi:cx="83.716393"
id="path3885"
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
sodipodi:type="arc" />
<text
sodipodi:linespacing="125%"
id="text4681"
y="856.37225"
x="75.488594"
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
xml:space="preserve"><tspan
style="font-size:13px"
y="856.37225"
x="75.488594"
id="tspan4683"
sodipodi:role="line">Src</tspan></text>
</g>
<g
id="g4731"
transform="translate(8.7630735,76.367533)">
<g
id="g4718">
<path
transform="matrix(1.1448338,0,0,1.1448338,-21.342609,840.64408)"
d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 -14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 14.520942,6.501247 14.520942,14.520942 z"
sodipodi:ry="14.520943"
sodipodi:rx="14.520943"
sodipodi:cy="60.300472"
sodipodi:cx="83.716393"
id="path4710"
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
sodipodi:type="arc" />
</g>
<text
xml:space="preserve"
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
x="62.881989"
y="914.22382"
id="text4687"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan4689"
x="62.881989"
y="914.22382"
style="font-size:13px">Snk</tspan></text>
</g>
<g
id="g4753">
<g
transform="translate(-67.755979,8.3842533)"
id="g4726">
<path
sodipodi:type="arc"
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
id="path4712"
sodipodi:cx="83.716393"
sodipodi:cy="60.300472"
sodipodi:rx="14.520943"
sodipodi:ry="14.520943"
d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 -14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 14.520942,6.501247 14.520942,14.520942 z"
transform="matrix(1.1448338,0,0,1.1448338,55.176446,841.90677)" />
<text
xml:space="preserve"
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
x="137.40105"
y="915.48651"
id="text4714"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan4716"
x="137.40105"
y="915.48651"
style="font-size:13px">Map</tspan></text>
</g>
<g
id="g4737"
transform="translate(-119.198,8.3842533)">
<path
transform="matrix(1.1448338,0,0,1.1448338,55.176446,841.90677)"
d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 -14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 14.520942,6.501247 14.520942,14.520942 z"
sodipodi:ry="14.520943"
sodipodi:rx="14.520943"
sodipodi:cy="60.300472"
sodipodi:cx="83.716393"
id="path4739"
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
sodipodi:type="arc" />
<text
sodipodi:linespacing="125%"
id="text4741"
y="915.48651"
x="137.40105"
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
xml:space="preserve"><tspan
style="font-size:13px"
y="915.48651"
x="137.40105"
id="tspan4743"
sodipodi:role="line">Map</tspan></text>
</g>
<g
transform="translate(-16.313963,8.3842533)"
id="g4745">
<path
sodipodi:type="arc"
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
id="path4747"
sodipodi:cx="83.716393"
sodipodi:cy="60.300472"
sodipodi:rx="14.520943"
sodipodi:ry="14.520943"
d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 -14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 14.520942,6.501247 14.520942,14.520942 z"
transform="matrix(1.1448338,0,0,1.1448338,55.176446,841.90677)" />
<text
xml:space="preserve"
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
x="137.40105"
y="915.48651"
id="text4749"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan4751"
x="137.40105"
y="915.48651"
style="font-size:13px">Map</tspan></text>
</g>
</g>
</g>
<path
inkscape:connector-curvature="0"
id="path4861"
d="M 97.248545,875.21054 54.181606,908.06852"
style="fill:none;stroke:#000000;stroke-width:0.95726824px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)" />
<path
transform="translate(0,790.15744)"
inkscape:connector-curvature="0"
id="path5871"
d="m 104.6518,87.195784 -0.35355,31.112696"
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-mid:none;marker-end:url(#TriangleOutM)" />
<path
transform="translate(0,790.15744)"
inkscape:connector-curvature="0"
id="path6245"
d="m 113.49064,84.72091 41.36575,33.58757"
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)" />
<path
transform="translate(0,790.15744)"
inkscape:connector-curvature="0"
id="path6433"
d="m 53.033009,154.37093 43.84062,32.17336"
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)" />
<path
transform="translate(0,790.15744)"
inkscape:connector-curvature="0"
id="path6621"
d="m 104.29825,153.66382 0.35355,31.46625"
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)" />
<path
transform="translate(0,790.15744)"
inkscape:connector-curvature="0"
id="path6809"
d="m 155.56349,154.01737 -41.7193,32.52692"
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)" />
</g>
<g
id="g7028"
transform="translate(157.92745,0.17677669)">
<g
id="g7030"
transform="translate(21.79899,7.2928933)">
<g
transform="translate(-2.6011424,-1.5258789e-5)"
id="g7032">
<path
sodipodi:type="arc"
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
id="path7034"
sodipodi:cx="83.716393"
sodipodi:cy="60.300472"
sodipodi:rx="14.520943"
sodipodi:ry="14.520943"
d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 -14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 14.520942,6.501247 14.520942,14.520942 z"
transform="matrix(1.1448338,0,0,1.1448338,-9.9783931,783.57046)" />
<text
xml:space="preserve"
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
x="75.488594"
y="856.37225"
id="text7036"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan7038"
x="75.488594"
y="856.37225"
style="font-size:13px">Src</tspan></text>
</g>
<g
transform="translate(8.7630735,76.367533)"
id="g7040">
<g
id="g7042">
<path
sodipodi:type="arc"
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
id="path7044"
sodipodi:cx="83.716393"
sodipodi:cy="60.300472"
sodipodi:rx="14.520943"
sodipodi:ry="14.520943"
d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 -14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 14.520942,6.501247 14.520942,14.520942 z"
transform="matrix(1.1448338,0,0,1.1448338,-21.342609,840.64408)" />
</g>
<text
sodipodi:linespacing="125%"
id="text7046"
y="914.22382"
x="62.881989"
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
xml:space="preserve"><tspan
style="font-size:13px"
y="914.22382"
x="62.881989"
id="tspan7048"
sodipodi:role="line">Snk</tspan></text>
</g>
<g
id="g7050">
<g
id="g7052"
transform="translate(-67.755979,8.3842533)">
<path
transform="matrix(1.1448338,0,0,1.1448338,55.176446,841.90677)"
d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 -14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 14.520942,6.501247 14.520942,14.520942 z"
sodipodi:ry="14.520943"
sodipodi:rx="14.520943"
sodipodi:cy="60.300472"
sodipodi:cx="83.716393"
id="path7054"
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
sodipodi:type="arc" />
<text
sodipodi:linespacing="125%"
id="text7056"
y="915.48651"
x="137.40105"
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
xml:space="preserve"><tspan
style="font-size:13px"
y="915.48651"
x="137.40105"
id="tspan7058"
sodipodi:role="line">Map</tspan></text>
</g>
<g
transform="translate(-119.198,8.3842533)"
id="g7060">
<path
sodipodi:type="arc"
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
id="path7062"
sodipodi:cx="83.716393"
sodipodi:cy="60.300472"
sodipodi:rx="14.520943"
sodipodi:ry="14.520943"
d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 -14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 14.520942,6.501247 14.520942,14.520942 z"
transform="matrix(1.1448338,0,0,1.1448338,55.176446,841.90677)" />
<text
xml:space="preserve"
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
x="137.40105"
y="915.48651"
id="text7064"
sodipodi:linespacing="125%"><tspan
sodipodi:role="line"
id="tspan7066"
x="137.40105"
y="915.48651"
style="font-size:13px">Map</tspan></text>
</g>
<g
id="g7068"
transform="translate(-16.313963,8.3842533)">
<path
transform="matrix(1.1448338,0,0,1.1448338,55.176446,841.90677)"
d="m 98.237335,60.300472 c 0,8.019695 -6.501247,14.520943 -14.520942,14.520943 -8.019696,0 -14.520943,-6.501248 -14.520943,-14.520943 0,-8.019695 6.501247,-14.520942 14.520943,-14.520942 8.019695,0 14.520942,6.501247 14.520942,14.520942 z"
sodipodi:ry="14.520943"
sodipodi:rx="14.520943"
sodipodi:cy="60.300472"
sodipodi:cx="83.716393"
id="path7070"
style="fill:#f5a030;fill-opacity:1;fill-rule:evenodd;stroke:#000000;stroke-width:0.60000002;stroke-linecap:butt;stroke-linejoin:miter;stroke-miterlimit:4;stroke-opacity:1;stroke-dasharray:none"
sodipodi:type="arc" />
<text
sodipodi:linespacing="125%"
id="text7072"
y="915.48651"
x="137.40105"
style="font-size:40px;font-style:normal;font-weight:normal;line-height:125%;letter-spacing:0px;word-spacing:0px;fill:#000000;fill-opacity:1;stroke:none;font-family:Sans"
xml:space="preserve"><tspan
style="font-size:13px"
y="915.48651"
x="137.40105"
id="tspan7074"
sodipodi:role="line">Map</tspan></text>
</g>
</g>
</g>
<path
style="fill:none;stroke:#000000;stroke-width:0.95726824px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
d="M 97.248545,875.21054 54.181606,908.06852"
id="path7076"
inkscape:connector-curvature="0" />
<path
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-mid:none;marker-end:url(#TriangleOutM)"
d="m 104.6518,87.195784 -0.35355,31.112696"
id="path7078"
inkscape:connector-curvature="0"
transform="translate(0,790.15744)" />
<path
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
d="m 113.49064,84.72091 41.36575,33.58757"
id="path7080"
inkscape:connector-curvature="0"
transform="translate(0,790.15744)" />
<path
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
d="m 53.033009,154.37093 43.84062,32.17336"
id="path7082"
inkscape:connector-curvature="0"
transform="translate(0,790.15744)" />
<path
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
d="m 104.29825,153.66382 0.35355,31.46625"
id="path7084"
inkscape:connector-curvature="0"
transform="translate(0,790.15744)" />
<path
style="fill:none;stroke:#000000;stroke-width:1px;stroke-linecap:butt;stroke-linejoin:miter;stroke-opacity:1;marker-end:url(#TriangleOutM)"
d="m 155.56349,154.01737 -41.7193,32.52692"
id="path7086"
inkscape:connector-curvature="0"
transform="translate(0,790.15744)" />
</g>
</g>
</svg>
......@@ -1270,8 +1270,8 @@ dataStream.partitionByHash(0);
<p>
Uses a user-defined Partitioner to select the target task for each element.
{% highlight java %}
dataStream.partitionCustom(new Partitioner(){...}, "someKey");
dataStream.partitionCustom(new Partitioner(){...}, 0);
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0);
{% endhighlight %}
</p>
</td>
......@@ -1282,7 +1282,7 @@ dataStream.partitionCustom(new Partitioner(){...}, 0);
<p>
Partitions elements randomly according to a uniform distribution.
{% highlight java %}
dataStream.partitionRandom();
dataStream.shuffle();
{% endhighlight %}
</p>
</td>
......@@ -1299,6 +1299,51 @@ dataStream.rebalance();
</p>
</td>
</tr>
<tr>
<td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
<td>
<p>
Partitions elements, round-robin, to a subset of downstream operations. This is
useful if you want to have pipelines where you, for example, fan out from
each parallel instance of a source to a subset of several mappers to distribute load
but don't want the full rebalance that rebalance() would incur. This would require only
local data transfers instead of transferring data over network, depending on
other configuration values such as the number of slots of TaskManagers.
</p>
<p>
The subset of downstream operations to which the upstream operation sends
elements depends on the degree of parallelism of both the upstream and downstream operation.
For example, if the upstream operation has parallelism 2 and the downstream operation
has parallelism 4, then one upstream operation would distribute elements to two
downstream operations while the other upstream operation would distribute to the other
two downstream operations. If, on the other hand, the downstream operation has parallelism
2 while the upstream operation has parallelism 4 then two upstream operations would
distribute to one downstream operation while the other two upstream operations would
distribute to the other downstream operations.
</p>
<p>
In cases where the different parallelisms are not multiples of each other one or several
downstream operations will have a differing number of inputs from upstream operations.
</p>
</p>
Please see this figure for a visualization of the connection pattern in the above
example:
</p>
<div style="text-align: center">
<img src="{{ site.baseurl }}/apis/streaming/fig/rescale.svg" alt="Checkpoint barriers in data streams" />
</div>
<p>
{% highlight java %}
dataStream.rescale();
{% endhighlight %}
</p>
</td>
</tr>
<tr>
<td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
<td>
......@@ -1357,7 +1402,7 @@ dataStream.partitionCustom(partitioner, 0)
<p>
Partitions elements randomly according to a uniform distribution.
{% highlight scala %}
dataStream.partitionRandom()
dataStream.shuffle()
{% endhighlight %}
</p>
</td>
......@@ -1374,6 +1419,51 @@ dataStream.rebalance()
</p>
</td>
</tr>
<tr>
<td><strong>Rescaling</strong><br>DataStream &rarr; DataStream</td>
<td>
<p>
Partitions elements, round-robin, to a subset of downstream operations. This is
useful if you want to have pipelines where you, for example, fan out from
each parallel instance of a source to a subset of several mappers to distribute load
but don't want the full rebalance that rebalance() would incur. This would require only
local data transfers instead of transferring data over network, depending on
other configuration values such as the number of slots of TaskManagers.
</p>
<p>
The subset of downstream operations to which the upstream operation sends
elements depends on the degree of parallelism of both the upstream and downstream operation.
For example, if the upstream operation has parallelism 2 and the downstream operation
has parallelism 4, then one upstream operation would distribute elements to two
downstream operations while the other upstream operation would distribute to the other
two downstream operations. If, on the other hand, the downstream operation has parallelism
2 while the upstream operation has parallelism 4 then two upstream operations would
distribute to one downstream operation while the other two upstream operations would
distribute to the other downstream operations.
</p>
<p>
In cases where the different parallelisms are not multiples of each other one or several
downstream operations will have a differing number of inputs from upstream operations.
</p>
</p>
Please see this figure for a visualization of the connection pattern in the above
example:
</p>
<div style="text-align: center">
<img src="{{ site.baseurl }}/apis/streaming/fig/rescale.svg" alt="Checkpoint barriers in data streams" />
</div>
<p>
{% highlight java %}
dataStream.rescale()
{% endhighlight %}
</p>
</td>
</tr>
<tr>
<td><strong>Broadcasting</strong><br>DataStream &rarr; DataStream</td>
<td>
......
......@@ -77,6 +77,7 @@ import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
......@@ -390,12 +391,8 @@ public class DataStream<T> {
}
/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are broadcasted to every parallel instance of the next component.
*
* <p>
* This setting only effects the how the outputs will be distributed between
* the parallel instances of the next processing operator.
* Sets the partitioning of the {@link DataStream} so that the output elements
* are broadcasted to every parallel instance of the next operation.
*
* @return The DataStream with broadcast partitioning set.
*/
......@@ -404,12 +401,8 @@ public class DataStream<T> {
}
/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are shuffled uniformly randomly to the next component.
*
* <p>
* This setting only effects the how the outputs will be distributed between
* the parallel instances of the next processing operator.
* Sets the partitioning of the {@link DataStream} so that the output elements
* are shuffled uniformly randomly to the next operation.
*
* @return The DataStream with shuffle partitioning set.
*/
......@@ -419,13 +412,8 @@ public class DataStream<T> {
}
/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are forwarded to the local subtask of the next component (whenever
* possible).
*
* <p>
* This setting only effects the how the outputs will be distributed between
* the parallel instances of the next processing operator.
* Sets the partitioning of the {@link DataStream} so that the output elements
* are forwarded to the local subtask of the next operation.
*
* @return The DataStream with forward partitioning set.
*/
......@@ -435,20 +423,41 @@ public class DataStream<T> {
}
/**
* Sets the partitioning of the {@link DataStream} so that the output tuples
* are distributed evenly to instances of the next component in a Round-robin
* Sets the partitioning of the {@link DataStream} so that the output elements
* are distributed evenly to instances of the next operation in a round-robin
* fashion.
*
* <p>
* This setting only effects the how the outputs will be distributed between
* the parallel instances of the next processing operator.
*
* @return The DataStream with rebalance partitioning set.
*/
public DataStream<T> rebalance() {
return setConnectionType(new RebalancePartitioner<T>());
}
/**
* Sets the partitioning of the {@link DataStream} so that the output elements
* are distributed evenly to a subset of instances of the next operation in a round-robin
* fashion.
*
* <p>The subset of downstream operations to which the upstream operation sends
* elements depends on the degree of parallelism of both the upstream and downstream operation.
* For example, if the upstream operation has parallelism 2 and the downstream operation
* has parallelism 4, then one upstream operation would distribute elements to two
* downstream operations while the other upstream operation would distribute to the other
* two downstream operations. If, on the other hand, the downstream operation has parallelism
* 2 while the upstream operation has parallelism 4 then two upstream operations will
* distribute to one downstream operation while the other two upstream operations will
* distribute to the other downstream operations.
*
* <p>In cases where the different parallelisms are not multiples of each other one or several
* downstream operations will have a differing number of inputs from upstream operations.
*
* @return The DataStream with rescale partitioning set.
*/
@Experimental
public DataStream<T> rescale() {
return setConnectionType(new RescalePartitioner<T>());
}
/**
* Sets the partitioning of the {@link DataStream} so that the output values
* all go to the first instance of the next processing operator. Use this
......
......@@ -114,28 +114,40 @@ public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<
}
@SuppressWarnings("unchecked")
@Override
public SingleOutputStreamOperator<T, O> broadcast() {
return (SingleOutputStreamOperator<T, O>) super.broadcast();
}
@SuppressWarnings("unchecked")
@Override
@Experimental
public SingleOutputStreamOperator<T, O> shuffle() {
return (SingleOutputStreamOperator<T, O>) super.shuffle();
}
@SuppressWarnings("unchecked")
@Override
@Experimental
public SingleOutputStreamOperator<T, O> forward() {
return (SingleOutputStreamOperator<T, O>) super.forward();
}
@SuppressWarnings("unchecked")
@Override
public SingleOutputStreamOperator<T, O> rebalance() {
return (SingleOutputStreamOperator<T, O>) super.rebalance();
}
@SuppressWarnings("unchecked")
@Override
@Experimental
public SingleOutputStreamOperator<T, O> rescale() {
return (SingleOutputStreamOperator<T, O>) super.rescale();
}
@SuppressWarnings("unchecked")
@Override
@Experimental
public SingleOutputStreamOperator<T, O> global() {
return (SingleOutputStreamOperator<T, O>) super.global();
......
......@@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
......@@ -365,6 +366,12 @@ public class StreamingJobGraphGenerator {
DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED,
true);
} else if (partitioner instanceof RescalePartitioner){
downStreamVertex.connectNewDataSetAsInput(
headVertex,
DistributionPattern.POINTWISE,
ResultPartitionType.PIPELINED,
true);
} else {
downStreamVertex.connectNewDataSetAsInput(
headVertex,
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.partitioner;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
/**
* Partitioner that distributes the data equally by cycling through the output
* channels. This distributes only to a subset of downstream nodes because
* {@link org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator} instantiates
* a {@link DistributionPattern#POINTWISE} distribution pattern when encountering
* {@code SemiRebalancePartitioner}.
*
* <p>The subset of downstream operations to which the upstream operation sends
* elements depends on the degree of parallelism of both the upstream and downstream operation.
* For example, if the upstream operation has parallelism 2 and the downstream operation
* has parallelism 4, then one upstream operation would distribute elements to two
* downstream operations while the other upstream operation would distribute to the other
* two downstream operations. If, on the other hand, the downstream operation has parallelism
* 2 while the upstream operation has parallelism 4 then two upstream operations will
* distribute to one downstream operation while the other two upstream operations will
* distribute to the other downstream operations.
*
* <p>In cases where the different parallelisms are not multiples of each other one or several
* downstream operations will have a differing number of inputs from upstream operations.
*
* @param <T> Type of the elements in the Stream being rescaled
*/
public class RescalePartitioner<T> extends StreamPartitioner<T> {
private static final long serialVersionUID = 1L;
private int[] returnArray = new int[] {-1};
@Override
public int[] selectChannels(SerializationDelegate<StreamRecord<T>> record, int numberOfOutputChannels) {
this.returnArray[0] = (this.returnArray[0] + 1) % numberOfOutputChannels;
return this.returnArray;
}
public StreamPartitioner<T> copy() {
return this;
}
@Override
public String toString() {
return "RESCALE";
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.runtime.partitioner;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.Before;
import org.junit.Test;
import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.junit.Assert.*;
public class RescalePartitionerTest extends TestLogger {
private RescalePartitioner<Tuple> distributePartitioner;
private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
null);
@Before
public void setPartitioner() {
distributePartitioner = new RescalePartitioner<Tuple>();
}
@Test
public void testSelectChannelsLength() {
sd.setInstance(streamRecord);
assertEquals(1, distributePartitioner.selectChannels(sd, 1).length);
assertEquals(1, distributePartitioner.selectChannels(sd, 2).length);
assertEquals(1, distributePartitioner.selectChannels(sd, 1024).length);
}
@Test
public void testSelectChannelsInterval() {
sd.setInstance(streamRecord);
assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
assertEquals(1, distributePartitioner.selectChannels(sd, 3)[0]);
assertEquals(2, distributePartitioner.selectChannels(sd, 3)[0]);
assertEquals(0, distributePartitioner.selectChannels(sd, 3)[0]);
}
@Test
public void testExecutionGraphGeneration() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
// get input data
DataStream<String> text = env.addSource(new ParallelSourceFunction<String>() {
@Override
public void run(SourceContext<String> ctx) throws Exception {
}
@Override
public void cancel() {
}
}).setParallelism(2);
DataStream<Tuple2<String, Integer>> counts = text
.rescale()
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value,
Collector<Tuple2<String, Integer>> out) throws Exception {
}
});
counts.rescale().print().setParallelism(2);
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
final JobID jobId = new JobID();
final String jobName = "Semi-Rebalance Test Job";
final Configuration cfg = new Configuration();
List<JobVertex> jobVertices = jobGraph.getVerticesSortedTopologicallyFromSources();
JobVertex sourceVertex = jobVertices.get(0);
JobVertex mapVertex = jobVertices.get(1);
JobVertex sinkVertex = jobVertices.get(2);
assertEquals(2, sourceVertex.getParallelism());
assertEquals(4, mapVertex.getParallelism());
assertEquals(2, sinkVertex.getParallelism());
ExecutionGraph eg = new ExecutionGraph(
TestingUtils.defaultExecutionContext(),
jobId,
jobName,
cfg,
AkkaUtils.getDefaultTimeout(),new ArrayList<BlobKey>(), new ArrayList<URL>(), ExecutionGraph.class.getClassLoader());
try {
eg.attachJobGraph(jobVertices);
}
catch (JobException e) {
e.printStackTrace();
fail("Building ExecutionGraph failed: " + e.getMessage());
}
ExecutionJobVertex execSourceVertex = eg.getJobVertex(sourceVertex.getID());
ExecutionJobVertex execMapVertex= eg.getJobVertex(mapVertex.getID());
ExecutionJobVertex execSinkVertex= eg.getJobVertex(sinkVertex.getID());
assertEquals(0, execSourceVertex.getInputs().size());
assertEquals(1, execMapVertex.getInputs().size());
assertEquals(4, execMapVertex.getParallelism());
ExecutionVertex[] mapTaskVertices = execMapVertex.getTaskVertices();
// verify that we have each parallel input partition exactly twice, i.e. that one source
// sends to two unique mappers
Map<Integer, Integer> mapInputPartitionCounts = new HashMap<>();
for (ExecutionVertex mapTaskVertex: mapTaskVertices) {
assertEquals(1, mapTaskVertex.getNumberOfInputs());
assertEquals(1, mapTaskVertex.getInputEdges(0).length);
ExecutionEdge inputEdge = mapTaskVertex.getInputEdges(0)[0];
assertEquals(sourceVertex.getID(), inputEdge.getSource().getProducer().getJobvertexId());
int inputPartition = inputEdge.getSource().getPartitionNumber();
if (!mapInputPartitionCounts.containsKey(inputPartition)) {
mapInputPartitionCounts.put(inputPartition, 1);
} else {
mapInputPartitionCounts.put(inputPartition, mapInputPartitionCounts.get(inputPartition) + 1);
}
}
assertEquals(2, mapInputPartitionCounts.size());
for (int count: mapInputPartitionCounts.values()) {
assertEquals(2, count);
}
assertEquals(1, execSinkVertex.getInputs().size());
assertEquals(2, execSinkVertex.getParallelism());
ExecutionVertex[] sinkTaskVertices = execSinkVertex.getTaskVertices();
// verify each sink instance has two inputs from the map and that each map subpartition
// only occurs in one unique input edge
Set<Integer> mapSubpartitions = new HashSet<>();
for (ExecutionVertex sinkTaskVertex: sinkTaskVertices) {
assertEquals(1, sinkTaskVertex.getNumberOfInputs());
assertEquals(2, sinkTaskVertex.getInputEdges(0).length);
ExecutionEdge inputEdge1 = sinkTaskVertex.getInputEdges(0)[0];
ExecutionEdge inputEdge2 = sinkTaskVertex.getInputEdges(0)[1];
assertEquals(mapVertex.getID(), inputEdge1.getSource().getProducer().getJobvertexId());
assertEquals(mapVertex.getID(), inputEdge2.getSource().getProducer().getJobvertexId());
int inputPartition1 = inputEdge1.getSource().getPartitionNumber();
assertFalse(mapSubpartitions.contains(inputPartition1));
mapSubpartitions.add(inputPartition1);
int inputPartition2 = inputEdge2.getSource().getPartitionNumber();
assertFalse(mapSubpartitions.contains(inputPartition2));
mapSubpartitions.add(inputPartition2);
}
assertEquals(4, mapSubpartitions.size());
}
}
......@@ -300,7 +300,7 @@ class DataStream[T](stream: JavaStream[T]) {
* Partitions a tuple DataStream on the specified key fields using a custom partitioner.
* This method takes the key position to partition on, and a partitioner that accepts the key
* type.
* <p>
*
* Note: This method works only on single field keys.
*/
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: Int) : DataStream[T] =
......@@ -310,7 +310,7 @@ class DataStream[T](stream: JavaStream[T]) {
* Partitions a POJO DataStream on the specified key fields using a custom partitioner.
* This method takes the key expression to partition on, and a partitioner that accepts the key
* type.
* <p>
*
* Note: This method works only on single field keys.
*/
def partitionCustom[K: TypeInformation](partitioner: Partitioner[K], field: String)
......@@ -320,7 +320,7 @@ class DataStream[T](stream: JavaStream[T]) {
* Partitions a DataStream on the key returned by the selector, using a custom partitioner.
* This method takes the key selector to get the key to partition on, and a partitioner that
* accepts the key type.
* <p>
*
* Note: This method works only on single field keys, i.e. the selector cannot return tuples
* of fields.
*/
......@@ -336,10 +336,7 @@ class DataStream[T](stream: JavaStream[T]) {
/**
* Sets the partitioning of the DataStream so that the output tuples
* are broad casted to every parallel instance of the next component. This
* setting only effects the how the outputs will be distributed between the
* parallel instances of the next processing operator.
*
* are broad casted to every parallel instance of the next component.
*/
def broadcast: DataStream[T] = stream.broadcast()
......@@ -353,10 +350,7 @@ class DataStream[T](stream: JavaStream[T]) {
/**
* Sets the partitioning of the DataStream so that the output tuples
* are shuffled to the next component. This setting only effects the how the
* outputs will be distributed between the parallel instances of the next
* processing operator.
*
* are shuffled to the next component.
*/
@Experimental
def shuffle: DataStream[T] = stream.shuffle()
......@@ -364,39 +358,53 @@ class DataStream[T](stream: JavaStream[T]) {
/**
* Sets the partitioning of the DataStream so that the output tuples
* are forwarded to the local subtask of the next component (whenever
* possible). This is the default partitioner setting. This setting only
* effects the how the outputs will be distributed between the parallel
* instances of the next processing operator.
*
* possible).
*/
@Experimental
def forward: DataStream[T] = stream.forward()
/**
* Sets the partitioning of the DataStream so that the output tuples
* are distributed evenly to the next component.This setting only effects
* the how the outputs will be distributed between the parallel instances of
* the next processing operator.
*
* are distributed evenly to the next component.
*/
def rebalance: DataStream[T] = stream.rebalance()
/**
* Sets the partitioning of the [[DataStream]] so that the output tuples
* are distributed evenly to a subset of instances of the downstream operation.
*
* The subset of downstream operations to which the upstream operation sends
* elements depends on the degree of parallelism of both the upstream and downstream operation.
* For example, if the upstream operation has parallelism 2 and the downstream operation
* has parallelism 4, then one upstream operation would distribute elements to two
* downstream operations while the other upstream operation would distribute to the other
* two downstream operations. If, on the other hand, the downstream operation has parallelism
* 2 while the upstream operation has parallelism 4 then two upstream operations will
* distribute to one downstream operation while the other two upstream operations will
* distribute to the other downstream operations.
*
* In cases where the different parallelisms are not multiples of each other one or several
* downstream operations will have a differing number of inputs from upstream operations.
*/
@Experimental
def rescale: DataStream[T] = stream.rescale()
/**
* Initiates an iterative part of the program that creates a loop by feeding
* back data streams. To create a streaming iteration the user needs to define
* a transformation that creates two DataStreams. The first one is the output
* that will be fed back to the start of the iteration and the second is the output
* stream of the iterative part.
* <p>
*
* stepfunction: initialStream => (feedback, output)
* <p>
*
* A common pattern is to use output splitting to create feedback and output DataStream.
* Please refer to the .split(...) method of the DataStream
* <p>
*
* By default a DataStream with iteration will never terminate, but the user
* can use the maxWaitTime parameter to set a max waiting time for the iteration head.
* If no data received in the set time the stream terminates.
* <p>
*
* By default the feedback partitioning is set to match the input, to override this set
* the keepPartitioning flag to true
*
......@@ -424,9 +432,8 @@ class DataStream[T](stream: JavaStream[T]) {
*
* This allows the user to distinguish standard input from feedback inputs.
*
* <p>
* stepfunction: initialStream => (feedback, output)
* <p>
*
* The user must set the max waiting time for the iteration head.
* If no data received in the set time the stream terminates. If this parameter is set
* to 0 then the iteration sources will indefinitely, so the job must be killed to stop.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册