提交 8998b2fe 编写于 作者: F Fabian Hueske

[FLINK-12749][docs] Revert commit f695a76b

上级 d49e174f
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="713px" height="359px" viewBox="-0.5 -0.5 713 359" content="&lt;mxfile modified=&quot;2019-07-30T06:33:46.579Z&quot; host=&quot;www.draw.io&quot; agent=&quot;Mozilla/5.0 (X11; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0&quot; etag=&quot;Gyms1__7o2-6Tou9Fwcv&quot; version=&quot;11.0.7&quot; type=&quot;device&quot;&gt;&lt;diagram id=&quot;axHalsAsTUV6G1jOH0Rx&quot; name=&quot;Page-1&quot;&gt;7Vprc6o4GP41fmwHiAH8qNZ6pnt22t2eM6f9tBMhAhUJG+Ktv34TCXJHpa66O850bPKQvLx570nogOF8PaYodH8nNvY7mmKvO+Cho2lqVzX5P4FsYsSAEnCoZ8tBKfDqfWIJKhJdeDaOcgMZIT7zwjxokSDAFsthiFKyyg+bEj//1hA5uAS8Wsgvo788m7kxakIlxb9hz3GTN6uKfDJHyWAJRC6yySoDgVEHDCkhLG7N10PsC+ElconnPdY83TFGccAOmfD5jXz+fHb0fgSmLzPrbfj8V/9Oi6kskb+QC5bMsk0iAc53KJrWxvcCG9MOGKxcj+HXEFkCX3HFc8xlc5/3VN6ckAUfaH+f7ABkzRwq0OcF41SwxClhiHkk4N2ewvtTz/eHxCd0+2YwnU41y+J4xCiZ4cwTW5/oUOdPykKQclliyvA6A0mhjDGZY0Y3fIh8CnU9niItVAO9uL9K9d2FMeRmVa1IS0bSxJwd6VQLvCEVUa2UJcHvxm/4j6ePX87ineAIT8d36k0pmmFcmVJgSQfY5pFCdgllLnFIgPxRig5oLHP+XAgyHfOdkFBK+wMztpFhDy0YySuNC5Bu3sT8e5h037PPHtaSeNzbyF6tBiKyoBZuWKY0NYaog9l+GxUyaNQnxT63pmU+mlYpR059IR5nObWDrpK3gx7Ik4gZlbMKKt6x0V7rFfFxqHX6gye0RBx/ocTCUSSwARe67jPhWJS3HLZVQ4zY3rIITRJg6HvWjJMaLfF24fVU+AImFVgF8ab3PSAmOB/jAFPECD2GesEBMoZaDBKmhauDxMSEXdhooocHiV7BNnSzFCPUpI7IBokkeX8lRlRmU3CJGNHe10HZ15uKhNP5+pcCMahxyUeew4QjPZHJf8kfh1z/7DROWO9sde55AicEZu9yXlhpIPpFMvXaY5lEzXvvSWbm7TRNi87Xs3SyE9qXpcF5sjTQ92TpeEH/Wpbu1oSEHyT0rH3BYOdwXhAuajzxUJ8LiCijc+4moVPUxADewyvztV5Z8oaQ7N8Lsa0dMG+OI4bmYQp1QD/tKOod/1N7BudPUfna+/GPKn6EUe1macO0s92zVxPki0B8p8MyiPFQoftLMxngVXQ2Hg3YikcX++G18yg2v+trZzJyydUL8oNMrt4gz+vZLZk8q2f32kWfs3p2Sx7P69ktmazw7EKpwHM76xy2Xzu8ElCTQ3RZB5hKqQwAKiiXAapxgk1Z5Rax4siyRQXG3fu6SzCYTLmaIwfYXICtuBuR1SvfKLCD3ULhe5M+ULatOreI6Y4C+ziqXdhI9aiolZ1oyU10OlMVb9kbM/5P4inEopt4mvPJTT7NxdVNPo113U08jfuHtuI5T/lk6oWDFNA1Snm8q1TcL5onuF50yYf1s//n20s0dqIfd48zjJ8OufPFgd0XXzTwnuWjKPKsTs0RpHHsGWThgHDvoWR9KVIWekaksKEy+uIRZOEuCKqFgqvmBLJEBxp5QkArEDrdhWOlHVRcOB5tBzWXxuk1cdOl8bGm0HQsuPd8untJk4GKcq8V4gA0WpqNWTAbcCqz4d30G6F4ePqlFRj9Aw==&lt;/diagram&gt;&lt;/mxfile&gt;"><defs/><g><path d="M 476 181 C 476 151.67 521 151.67 521 181 L 521 245 C 521 274.33 476 274.33 476 245 Z" fill="#fff2cc" stroke="#d6b656" stroke-miterlimit="10" transform="rotate(90,498.5,213)" pointer-events="none"/><path d="M 476 181 C 476 203 521 203 521 181" fill="none" stroke="#d6b656" stroke-miterlimit="10" transform="rotate(90,498.5,213)" pointer-events="none"/><path d="M 187 181 C 187 151.67 232 151.67 232 181 L 232 245 C 232 274.33 187 274.33 187 245 Z" fill="#fff2cc" stroke="#d6b656" stroke-miterlimit="10" transform="rotate(90,209.5,213)" pointer-events="none"/><path d="M 187 181 C 187 203 232 203 232 181" fill="none" stroke="#d6b656" stroke-miterlimit="10" transform="rotate(90,209.5,213)" pointer-events="none"/><path d="M 110 213 L 149.63 213" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><path d="M 154.88 213 L 147.88 216.5 L 149.63 213 L 147.88 209.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><rect x="0" y="188" width="110" height="50" fill="#f8cecc" stroke="#b85450" pointer-events="none"/><g transform="translate(10.5,192.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="88" height="41" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">«Java Process»<br /><div><b>Click Event <br /></b></div><div><b>Data Generator</b></div></div></div></foreignObject><text x="44" y="27" fill="#000000" text-anchor="middle" font-size="12px" font-family="'Helvetica'">[Not supported by viewer]</text></switch></g><path d="M 409 213 L 429 213 L 425 213 L 438.13 213" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><path d="M 443.38 213 L 436.38 216.5 L 438.13 213 L 436.38 209.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><rect x="299" y="188" width="110" height="50" fill="#f8cecc" stroke="#b85450" pointer-events="none"/><g transform="translate(321.5,192.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="65" height="41" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">«Flink Job»<br /><div><b>Click Event <br /></b></div><div><b>Count</b></div></div></div></foreignObject><text x="33" y="27" fill="#000000" text-anchor="middle" font-size="12px" font-family="'Helvetica'">[Not supported by viewer]</text></switch></g><path d="M 264 213 L 284 213 L 279 213 L 292.63 213" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><path d="M 297.88 213 L 290.88 216.5 L 292.63 213 L 290.88 209.5 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><g transform="translate(178.5,199.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="43" height="27" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">«Topic»<br /><b>input</b></div></div></foreignObject><text x="22" y="20" fill="#000000" text-anchor="middle" font-size="12px" font-family="'Helvetica'">[Not supported by viewer]</text></switch></g><g transform="translate(69.5,6.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="306" height="156" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">{"timestamp":"01-01-1970 12:12:11:160","page":"/about"}<br />{"timestamp":"01-01-1970 12:12:11:160","page":"/news"}<br />{"timestamp":"01-01-1970 12:12:11:175","page":"/help"}<br />{"timestamp":"01-01-1970 12:12:11:175","page":"/index"}<br />{"timestamp":"01-01-1970 12:12:11:175","page":"/shop"}<br />{"timestamp":"01-01-1970 12:12:11:175","page":"/jobs"}<br />{"timestamp":"01-01-1970 12:12:11:175","page":"/about"}<br />{"timestamp":"01-01-1970 12:12:11:175","page":"/news"}<br />{"timestamp":"01-01-1970 12:12:11:190","page":"/help"}<br />{"timestamp":"01-01-1970 12:12:11:190","page":"/index"}<br />{"timestamp":"01-01-1970 12:12:11:190","page":"/shop"}</div></div></foreignObject><text x="153" y="84" fill="#000000" text-anchor="middle" font-size="12px" font-family="'Helvetica'">[Not supported by viewer]</text></switch></g><g transform="translate(464.5,199.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="43" height="27" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">«Topic»<br /><b>output</b></div></div></foreignObject><text x="22" y="20" fill="#000000" text-anchor="middle" font-size="12px" font-family="'Helvetica'">[Not supported by viewer]</text></switch></g><g transform="translate(98.5,273.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="614" height="84" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: left;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">{"windowStart":"01-01-1970 12:00:30:000","windowEnd":"01-01-1970 12:00:45:000","page":"/help","count":1000}<br />{"windowStart":"01-01-1970 12:00:30:000","windowEnd":"01-01-1970 12:00:45:000","page":"/shop","count":1000}<br />{"windowStart":"01-01-1970 12:00:30:000","windowEnd":"01-01-1970 12:00:45:000","page":"/index","count":1000}<br />{"windowStart":"01-01-1970 12:00:30:000","windowEnd":"01-01-1970 12:00:45:000","page":"/about","count":1000}<br />{"windowStart":"01-01-1970 12:00:30:000","windowEnd":"01-01-1970 12:00:45:000","page":"/news","count":1000}<br />{"windowStart":"01-01-1970 12:00:30:000","windowEnd":"01-01-1970 12:00:45:000","page":"/jobs","count":1000}<br /></div></div></foreignObject><text x="307" y="48" fill="#000000" text-anchor="middle" font-size="12px" font-family="'Helvetica'">[Not supported by viewer]</text></switch></g><path d="M 400.25 267 L 473.97 242.04" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><path d="M 478.94 240.36 L 473.43 245.92 L 473.97 242.04 L 471.19 239.29 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><path d="M 204.48 171 L 201.95 181.8" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><path d="M 200.75 186.91 L 198.94 179.3 L 201.95 181.8 L 205.76 180.89 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/></g></svg>
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" version="1.1" width="681px" height="221px" viewBox="-0.5 -0.5 681 221" content="&lt;mxfile modified=&quot;2019-07-30T05:46:19.236Z&quot; host=&quot;www.draw.io&quot; agent=&quot;Mozilla/5.0 (X11; Linux x86_64; rv:66.0) Gecko/20100101 Firefox/66.0&quot; etag=&quot;6b7qPJhosj6WVEuTns2y&quot; version=&quot;11.0.7&quot; type=&quot;device&quot;&gt;&lt;diagram id=&quot;zIUxMKcIWk6lTGESeTwo&quot; name=&quot;Page-1&quot;&gt;7VnbctowEP0aHtPBNjbwGJtcph3SzNBpm7wJazEaZMsji0v69ZWwDL4RksYESAMPSGellbTnWNaKluWFqxuO4umQYaAts41XLWvQMk2jY/Tkj0KeUqRrayDgBOtGW2BE/oAG2xqdEwxJoaFgjAoSF0GfRRH4ooAhztmy2GzCaHHUGAVQAUY+olX0F8FimqI9u73Fb4EE02xko60tIcoaayCZIsyWOci6alkeZ0ykpXDlAVXBy+KS9rveYd1MjEMkXtKhH8UDQxi9Lvs+6uNg9tXl+MJMvSwQnesFtzyzdel6LBKIRMBV1XVbpkPlIO6Yy1KgShskAzxK1EQ0LKcx3jZdL188ZTGdipDKkiFtE0Kpxyjja4s16fng+xJPBGczyFnGPbsjg555yy9bR2IBXMAqB+kw3AALQfAn2URbO5oRLUnT0fXllmAj0980R25GOtKaCjaet2GXBR35V7BgNcbCNSXRTLoaokQAPws2jO6p0WE3RscPlMyGKJKbxHlwYRmnxoXTGBePTMYN4rczMZmY9UxgZ+zYTkN7lHNqTHQbY+IbmsyQdOWq+J0HHc7JPRj9HXTcoRCwNPxkdB7CPk4wWex+o0/Bn8WMqLd6e7eLIm8brMbzc4MNCJfHN6YC83K3LxeJrb51InHWH9VDyjiHp5/DvOGso4snG6yinluWiEbEM0IL0Nr5FE5Tr+PjC2ezhpxwytGECF+qzEvWfIqShKjdORdiWBHxW5bbX2xde8hZBmrt7azylFUiOftcJ1V9yNu23da1rN9OEhI25z7sP5MLxAMQ+/dhwIU8skppjjL7GcY4UCTIoph91tGoR7jX+3N9atPplYSQLlt3yieLJT/9kh+r5CcNS8XPWlObRb9BZjV56f8ts+wpPxGdmZZd3JnMfxRaxVH3nZVWk3t/TKXZZ7mhNSY0q3tkoXWOI7TDC+ZzazqQYqr3UCGbRyKp6EYeJ0VRKcXjasQiKJ1tNYQoCSIlN6kOmYpbrjqcEh/RS20ICcZqGHc5JQJGMVpLZslRLDEup4MB71PaKw685Uwpi0FOOp0a6ZQpbu68W731+OgcGD3jaBxY1DPMyZ24fxg+BuMfj2KMbi9qUo7XXTw9e8lBia8uzK8W8D73HEioq68bkPNGMm1tJmttgPbyDZfRP1yuKavb/73S7XL776F19Rc=&lt;/diagram&gt;&lt;/mxfile&gt;"><defs/><g><rect x="0" y="70" width="110" height="50" fill="#f8cecc" stroke="#b85450" pointer-events="none"/><g transform="translate(20.5,81.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="68" height="27" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">«Container»<br /><b>Client</b></div></div></foreignObject><text x="34" y="20" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">«Container»&lt;br&gt;&lt;b&gt;Client&lt;/b&gt;</text></switch></g><rect x="130" y="70" width="110" height="50" fill="#f8cecc" stroke="#b85450" pointer-events="none"/><g transform="translate(149.5,81.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="70" height="27" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">«Container»<br /><b>Flink Master</b></div></div></foreignObject><text x="35" y="20" fill="#000000" text-anchor="middle" font-size="12px" font-family="'Helvetica'">[Not supported by viewer]</text></switch></g><rect x="270" y="70" width="110" height="50" fill="#f8cecc" stroke="#b85450" pointer-events="none"/><g transform="translate(286.5,81.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="77" height="27" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">«Container»<br /><b>TaskManager</b></div></div></foreignObject><text x="39" y="20" fill="#000000" text-anchor="middle" font-size="12px" font-family="'Helvetica'">[Not supported by viewer]</text></switch></g><rect x="420" y="70" width="110" height="50" fill="#fff2cc" stroke="#d6b656" pointer-events="none"/><g transform="translate(440.5,81.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="68" height="27" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">«Container»<br /><b>Zookeeper</b></div></div></foreignObject><text x="34" y="20" fill="#000000" text-anchor="middle" font-size="12px" font-family="'Helvetica'">[Not supported by viewer]</text></switch></g><rect x="570" y="70" width="110" height="50" fill="#fff2cc" stroke="#d6b656" pointer-events="none"/><g transform="translate(586.5,81.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="77" height="27" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">«Container»<br /><b>Kafka Broker</b></div></div></foreignObject><text x="39" y="20" fill="#000000" text-anchor="middle" font-size="12px" font-family="'Helvetica'">[Not supported by viewer]</text></switch></g><rect x="130" y="170" width="110" height="50" fill="#f5f5f5" stroke="#666666" pointer-events="none"/><g transform="translate(135.5,174.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="98" height="41" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(51, 51, 51); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">«Named Volume»<br /><div><b>Checkpoint <br /></b></div><div><b>Directory</b></div></div></div></foreignObject><text x="49" y="27" fill="#333333" text-anchor="middle" font-size="12px" font-family="'Helvetica'">[Not supported by viewer]</text></switch></g><rect x="270" y="170" width="110" height="50" fill="#f5f5f5" stroke="#666666" pointer-events="none"/><g transform="translate(283.5,174.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="83" height="41" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(51, 51, 51); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">«Host Volume»<br /><div><b>Savepoint<br /></b></div><div><b>Directory</b></div></div></div></foreignObject><text x="42" y="27" fill="#333333" text-anchor="middle" font-size="12px" font-family="'Helvetica'">[Not supported by viewer]</text></switch></g><path d="M 185 120 L 185 163.63" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><path d="M 185 168.88 L 181.5 161.88 L 185 163.63 L 188.5 161.88 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><path d="M 185 120 L 319 167.86" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><path d="M 323.95 169.62 L 316.18 170.57 L 319 167.86 L 318.53 163.97 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><path d="M 325 120 L 191 167.86" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><path d="M 186.05 169.62 L 191.47 163.97 L 191 167.86 L 193.82 170.57 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><path d="M 325 120 L 325 163.63" fill="none" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><path d="M 325 168.88 L 321.5 161.88 L 325 163.63 L 328.5 161.88 Z" fill="#000000" stroke="#000000" stroke-miterlimit="10" pointer-events="none"/><g transform="translate(329.5,138.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="40" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 41px; white-space: nowrap; overflow-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;white-space:normal;">mounts</div></div></foreignObject><text x="20" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">mounts</text></switch></g><g transform="translate(140.5,138.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="40" height="12" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; width: 41px; white-space: nowrap; overflow-wrap: normal; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;white-space:normal;">mounts</div></div></foreignObject><text x="20" y="12" fill="#000000" text-anchor="middle" font-size="12px" font-family="Helvetica">mounts</text></switch></g><rect x="570" y="0" width="110" height="50" fill="#ffffff" stroke="#000000" pointer-events="none"/><g transform="translate(580.5,4.5)"><switch><foreignObject style="overflow:visible;" pointer-events="all" width="88" height="41" requiredFeatures="http://www.w3.org/TR/SVG11/feature#Extensibility"><div xmlns="http://www.w3.org/1999/xhtml" style="display: inline-block; font-size: 12px; font-family: &quot;Helvetica&quot;; color: rgb(0, 0, 0); line-height: 1.2; vertical-align: top; white-space: nowrap; text-align: center;"><div xmlns="http://www.w3.org/1999/xhtml" style="display:inline-block;text-align:inherit;text-decoration:inherit;">«Container»<br /><div><b>Click Event <br /></b></div><div><b>Data Generator</b></div></div></div></foreignObject><text x="44" y="27" fill="#000000" text-anchor="middle" font-size="12px" font-family="'Helvetica'">[Not supported by viewer]</text></switch></g></g></svg>
\ No newline at end of file
---
title: Docker Playgrounds
nav-id: docker-playgrounds
nav-title: '<i class="fa fa-ship title appetizer" aria-hidden="true"></i> Docker Playgrounds'
nav-parent_id: getting-started
nav-pos: 3
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
---
title: Docker Playgrounds
nav-id: docker-playgrounds
nav-title: '<i class="fa fa-ship title appetizer" aria-hidden="true"></i> Docker Playgrounds'
nav-parent_id: getting-started
nav-pos: 3
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
......@@ -191,13 +191,6 @@ under the License.
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-streaming-click-event-count_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-streaming-twitter_${scala.binary.version}</artifactId>
......
......@@ -239,17 +239,6 @@ under the License.
<exclude>original-*.jar</exclude>
</excludes>
</fileSet>
<fileSet>
<directory>../flink-examples/flink-examples-build-helper/flink-examples-streaming-click-event-count/target</directory>
<outputDirectory>examples/streaming</outputDirectory>
<fileMode>0644</fileMode>
<includes>
<include>*.jar</include>
</includes>
<excludes>
<exclude>original-*.jar</exclude>
</excludes>
</fileSet>
<!-- copy jar files of the gelly examples -->
<fileSet>
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flink-examples-build-helper</artifactId>
<groupId>org.apache.flink</groupId>
<version>1.10-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
<artifactId>flink-examples-streaming-click-event-count_${scala.binary.version}</artifactId>
<name>flink-examples-streaming-click-event-count</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-examples-streaming_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<build>
<finalName>ClickEventCount</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadeTestJar>false</shadeTestJar>
<shadedArtifactAttached>false</shadedArtifactAttached>
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.flink.streaming.examples.windowing.clickeventcount.ClickEventCount</mainClass>
</transformer>
</transformers>
<artifactSet>
<includes>
<include>org.apache.flink:flink-connector-kafka*</include>
<include>org.apache.flink:flink-examples-streaming*</include>
<include>org.apache.kafka:*</include>
</includes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:flink-examples-streaming_*</artifact>
<includes>
<include>org/apache/flink/streaming/examples/windowing/clickeventcount/**</include>
</includes>
</filter>
<filter>
<artifact>org.apache.kafka:*</artifact>
<excludes>
<exclude>LICENSE</exclude>
<!-- Does not contain anything relevant.
Cites a binary dependency on jersey, but this is neither reflected in the
dependency graph, nor are any jersey files bundled. -->
<exclude>NOTICE</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
flink-examples-streaming-click-event-count
Copyright 2014-2019 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
- org.apache.kafka:kafka-clients:2.2.0
......@@ -6,4 +6,4 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
- org.apache.kafka:kafka-clients:2.2.0
- org.apache.kafka:kafka-clients:0.10.2.1
......@@ -36,7 +36,6 @@ under the License.
<module>flink-examples-streaming-twitter</module>
<module>flink-examples-streaming-state-machine</module>
<module>flink-examples-streaming-gcp-pubsub</module>
<module>flink-examples-streaming-click-event-count</module>
</modules>
</project>
......@@ -58,7 +58,7 @@ under the License.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
......@@ -364,6 +364,7 @@ under the License.
</includes>
</configuration>
</execution>
</executions>
</plugin>
......
......@@ -20,7 +20,7 @@ package org.apache.flink.streaming.examples.statemachine;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorSource;
import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializer;
......@@ -46,7 +46,7 @@ public class KafkaEventsGeneratorJob {
env
.addSource(new EventsGeneratorSource(errorRate, sleep))
.addSink(new FlinkKafkaProducer<>(brokers, kafkaTopic, new EventDeSerializer()));
.addSink(new FlinkKafkaProducer010<>(brokers, kafkaTopic, new EventDeSerializer()));
// trigger program execution
env.execute("State machine example Kafka events generator job");
......
......@@ -29,7 +29,7 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.examples.statemachine.dfa.State;
import org.apache.flink.streaming.examples.statemachine.event.Alert;
import org.apache.flink.streaming.examples.statemachine.event.Event;
......@@ -82,7 +82,7 @@ public class StateMachineExample {
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", brokers);
FlinkKafkaConsumer<Event> kafka = new FlinkKafkaConsumer<>(kafkaTopic, new EventDeSerializer(), kafkaProps);
FlinkKafkaConsumer010<Event> kafka = new FlinkKafkaConsumer010<>(kafkaTopic, new EventDeSerializer(), kafkaProps);
kafka.setStartFromLatest();
kafka.setCommitOffsetsOnCheckpoints(false);
source = kafka;
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.windowing.clickeventcount;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.examples.windowing.clickeventcount.functions.ClickEventStatisticsCollector;
import org.apache.flink.streaming.examples.windowing.clickeventcount.functions.CountingAggregator;
import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEvent;
import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventDeserializationSchema;
import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventStatistics;
import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventStatisticsSerializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
/**
* A simple streaming job reading {@link ClickEvent}s from Kafka, counting events per 15 seconds and
* writing the resulting {@link ClickEventStatistics} back to Kafka.
*
* <p> It can be run with or without checkpointing and with event time or processing time semantics.
* </p>
*
* <p>The Job can be configured via the command line:</p>
* * "--checkpointing": enables checkpointing
* * "--event-time": set the StreamTimeCharacteristic to EventTime
* * "--input-topic": the name of the Kafka Topic to consume {@link ClickEvent}s from
* * "--output-topic": the name of the Kafka Topic to produce {@link ClickEventStatistics} to
* * "--bootstrap.servers": comma-separated list of Kafka brokers
*
*/
public class ClickEventCount {
public static final String CHECKPOINTING_OPTION = "checkpointing";
public static final String EVENT_TIME_OPTION = "event-time";
public static final Time WINDOW_SIZE = Time.of(15, TimeUnit.SECONDS);
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
configureEnvironment(params, env);
String inputTopic = params.get("input-topic", "input");
String outputTopic = params.get("output-topic", "output");
String brokers = params.get("bootstrap.servers", "localhost:9092");
Properties kafkaProps = new Properties();
kafkaProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "click-event-count");
env.addSource(new FlinkKafkaConsumer<>(inputTopic, new ClickEventDeserializationSchema(), kafkaProps))
.name("ClickEvent Source")
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ClickEvent>(Time.of(200, TimeUnit.MILLISECONDS)) {
@Override
public long extractTimestamp(final ClickEvent element) {
return element.getTimestamp().getTime();
}
})
.keyBy(ClickEvent::getPage)
.timeWindow(WINDOW_SIZE)
.aggregate(new CountingAggregator(),
new ClickEventStatisticsCollector())
.name("ClickEvent Counter")
.addSink(new FlinkKafkaProducer<>(
outputTopic,
new ClickEventStatisticsSerializationSchema(outputTopic),
kafkaProps,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE))
.name("ClickEventStatistics Sink");
env.execute("Click Event Count");
}
private static void configureEnvironment(
final ParameterTool params,
final StreamExecutionEnvironment env) {
boolean checkpointingEnabled = params.has(CHECKPOINTING_OPTION);
boolean eventTimeSemantics = params.has(EVENT_TIME_OPTION);
if (checkpointingEnabled) {
env.enableCheckpointing(1000);
}
if (eventTimeSemantics) {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
}
//disabling Operator chaining to make it easier to follow the Job in the WebUI
env.disableOperatorChaining();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.windowing.clickeventcount;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEvent;
import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventSerializationSchema;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static org.apache.flink.streaming.examples.windowing.clickeventcount.ClickEventCount.WINDOW_SIZE;
/**
* A generator which pushes {@link ClickEvent}s into a Kafka Topic configured via `--topic` and
* `--bootstrap.servers`.
*
* <p> The generator creates the same number of {@link ClickEvent}s for all pages. The delay between
* events is chosen such that processing time and event time roughly align. The generator always
* creates the same sequence of events. </p>
*
*/
public class ClickEventGenerator {
public static final int EVENTS_PER_WINDOW = 1000;
private static final List<String> pages = Arrays.asList("/help", "/index", "/shop", "/jobs", "/about", "/news");
//this calculation is only accurate as long as pages.size() * EVENTS_PER_WINDOW divides the
//window size
public static final long DELAY = WINDOW_SIZE.toMilliseconds() / pages.size() / EVENTS_PER_WINDOW;
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
String topic = params.get("topic", "input");
Properties kafkaProps = createKafkaProperties(params);
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(kafkaProps);
ClickIterator clickIterator = new ClickIterator();
while (true) {
ProducerRecord<byte[], byte[]> record = new ClickEventSerializationSchema(topic).serialize(
clickIterator.next(),
null);
producer.send(record);
Thread.sleep(DELAY);
}
}
private static Properties createKafkaProperties(final ParameterTool params) {
String brokers = params.get("bootstrap.servers", "localhost:9092");
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
return kafkaProps;
}
static class ClickIterator {
private Map<String, Long> nextTimestampPerKey;
private int nextPageIndex;
ClickIterator() {
nextTimestampPerKey = new HashMap<>();
nextPageIndex = 0;
}
ClickEvent next() {
String page = nextPage();
return new ClickEvent(nextTimestamp(page), page);
}
private Date nextTimestamp(String page) {
long nextTimestamp = nextTimestampPerKey.getOrDefault(page, 0L);
nextTimestampPerKey.put(page, nextTimestamp + WINDOW_SIZE.toMilliseconds() / EVENTS_PER_WINDOW);
return new Date(nextTimestamp);
}
private String nextPage() {
String nextPage = pages.get(nextPageIndex);
if (nextPageIndex == pages.size() - 1) {
nextPageIndex = 0;
} else {
nextPageIndex++;
}
return nextPage;
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.windowing.clickeventcount.functions;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEvent;
import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEventStatistics;
import org.apache.flink.util.Collector;
import java.util.Date;
/**
* A simple {@link ProcessWindowFunction}, which wraps a count of {@link ClickEvent}s into an
* instance of {@link ClickEventStatistics}.
*
**/
public class ClickEventStatisticsCollector
extends ProcessWindowFunction<Long, ClickEventStatistics, String, TimeWindow> {
@Override
public void process(
final String page,
final Context context,
final Iterable<Long> elements,
final Collector<ClickEventStatistics> out) throws Exception {
Long count = elements.iterator().next();
out.collect(new ClickEventStatistics(new Date(context.window().getStart()), new Date(context.window().getEnd()), page, count));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.windowing.clickeventcount.functions;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.examples.windowing.clickeventcount.records.ClickEvent;
/**
* An {@link AggregateFunction} which simply counts {@link ClickEvent}s.
*
*/
public class CountingAggregator implements AggregateFunction<ClickEvent, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(final ClickEvent value, final Long accumulator) {
return accumulator + 1;
}
@Override
public Long getResult(final Long accumulator) {
return accumulator;
}
@Override
public Long merge(final Long a, final Long b) {
return a + b;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.windowing.clickeventcount.records;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat;
import java.util.Date;
import java.util.Objects;
/**
* A simple event recording a click on a {@link ClickEvent#page} at time {@link ClickEvent#timestamp}.
*
*/
public class ClickEvent {
//using java.util.Date for better readability in Flink Cluster Playground
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS")
private Date timestamp;
private String page;
public ClickEvent() {
}
public ClickEvent(final Date timestamp, final String page) {
this.timestamp = timestamp;
this.page = page;
}
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(final Date timestamp) {
this.timestamp = timestamp;
}
public String getPage() {
return page;
}
public void setPage(final String page) {
this.page = page;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ClickEvent that = (ClickEvent) o;
return Objects.equals(timestamp, that.timestamp) && Objects.equals(page, that.page);
}
@Override
public int hashCode() {
return Objects.hash(timestamp, page);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("ClickEvent{");
sb.append("timestamp=").append(timestamp);
sb.append(", page='").append(page).append('\'');
sb.append('}');
return sb.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.windowing.clickeventcount.records;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
/**
* A Kafka {@link DeserializationSchema} to deserialize {@link ClickEvent}s from JSON.
*
*/
public class ClickEventDeserializationSchema implements DeserializationSchema<ClickEvent> {
private static final long serialVersionUID = 1L;
private static final ObjectMapper objectMapper = new ObjectMapper();
@Override
public ClickEvent deserialize(byte[] message) throws IOException {
return objectMapper.readValue(message, ClickEvent.class);
}
@Override
public boolean isEndOfStream(ClickEvent nextElement) {
return false;
}
@Override
public TypeInformation<ClickEvent> getProducedType() {
return TypeInformation.of(ClickEvent.class);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.windowing.clickeventcount.records;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
/**
* A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEvent}s as JSON.
*
*/
public class ClickEventSerializationSchema implements KafkaSerializationSchema<ClickEvent> {
private static final ObjectMapper objectMapper = new ObjectMapper();
private String topic;
public ClickEventSerializationSchema(){
}
public ClickEventSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(
final ClickEvent message, @Nullable final Long timestamp) {
try {
//if topic is null, default topic will be used
return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + message, e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.windowing.clickeventcount.records;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonFormat;
import java.util.Date;
import java.util.Objects;
/**
* A small wrapper class for windowed page counts.
*
*/
public class ClickEventStatistics {
//using java.util.Date for better readability in Flink Cluster Playground
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS")
private Date windowStart;
//using java.util.Date for better readability in Flink Cluster Playground
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "dd-MM-yyyy hh:mm:ss:SSS")
private Date windowEnd;
private String page;
private long count;
public ClickEventStatistics() {
}
public ClickEventStatistics(
final Date windowStart,
final Date windowEnd,
final String page,
final long count) {
this.windowStart = windowStart;
this.windowEnd = windowEnd;
this.page = page;
this.count = count;
}
public Date getWindowStart() {
return windowStart;
}
public void setWindowStart(final Date windowStart) {
this.windowStart = windowStart;
}
public Date getWindowEnd() {
return windowEnd;
}
public void setWindowEnd(final Date windowEnd) {
this.windowEnd = windowEnd;
}
public String getPage() {
return page;
}
public void setPage(final String page) {
this.page = page;
}
public long getCount() {
return count;
}
public void setCount(final long count) {
this.count = count;
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ClickEventStatistics that = (ClickEventStatistics) o;
return count == that.count &&
Objects.equals(windowStart, that.windowStart) &&
Objects.equals(windowEnd, that.windowEnd) &&
Objects.equals(page, that.page);
}
@Override
public int hashCode() {
return Objects.hash(windowStart, windowEnd, page, count);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("ClickEventStatistics{");
sb.append("windowStart=").append(windowStart);
sb.append(", windowEnd=").append(windowEnd);
sb.append(", page='").append(page).append('\'');
sb.append(", count=").append(count);
sb.append('}');
return sb.toString();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.examples.windowing.clickeventcount.records;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
/**
* A Kafka {@link KafkaSerializationSchema} to serialize {@link ClickEventStatistics}s as JSON.
*
*/
public class ClickEventStatisticsSerializationSchema implements KafkaSerializationSchema<ClickEventStatistics> {
private static final ObjectMapper objectMapper = new ObjectMapper();
private String topic;
public ClickEventStatisticsSerializationSchema(){
}
public ClickEventStatisticsSerializationSchema(String topic) {
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(
final ClickEventStatistics message, @Nullable final Long timestamp) {
try {
//if topic is null, default topic will be used
return new ProducerRecord<>(topic, objectMapper.writeValueAsBytes(message));
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + message, e);
}
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册