43.md 16.5 KB
Newer Older
W
wizardforcel 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169


# Python编程指南(流)Beta

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)


Flink中的分析流程序是实现流数据集转换(例如,Filter,映射,Join,分组)的常规程序。流数据集最初是从某些源创建的(例如,通过读取文件或从集合中创建)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink流处理程序可在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

为了创建自己的Flink流处理程序,我们建议您从[程序框架](#program-skeleton)开始, 逐步添加自己的 [转换](#transformations)。其余部分充当其他 算子操作和高级函数的参考。

## Jython框架

Flink Python流API使用Jython框架(请参阅[http://www.jython.org/archive/21/docs/whatis.html](http://www.jython.org/archive/21/docs/whatis.html))来驱动给定脚本的执行。Python流层实际上是现有Java流API的薄打包层。

#### 约束

使用Jython有两个主要限制:

*   最新的Python支持版本是2.7
*   使用Python C扩展并不简单

## 流程序示例

以下流处理程序是WordCount的完整工作示例。您可以复制并粘贴代码以在本地运行它(请参阅本节后面的注释)。它计算句子流中每个单词的数量(不区分大小写),窗口大小为50毫秒,并将结果打印到标准输出中。



```
from org.apache.flink.streaming.api.functions.source import SourceFunction
from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction
from org.apache.flink.api.java.functions import KeySelector
from org.apache.flink.streaming.api.windowing.time.Time import milliseconds

class Generator(SourceFunction):
    def __init__(self, num_iters):
        self._running = True
        self._num_iters = num_iters

    def run(self, ctx):
        counter = 0
        while self._running and counter < self._num_iters:
            ctx.collect('Hello World')
            counter += 1

    def cancel(self):
        self._running = False

class Tokenizer(FlatMapFunction):
    def flatMap(self, value, collector):
        for word in value.lower().split():
            collector.collect((1, word))

class Selector(KeySelector):
    def getKey(self, input):
        return input[1]

class Sum(ReduceFunction):
    def reduce(self, input1, input2):
        count1, word1 = input1
        count2, word2 = input2
        return (count1 + count2, word1)

def main(factory):
    env = factory.get_execution_environment()
    env.create_python_source(Generator(num_iters=1000)) \
        .flat_map(Tokenizer()) \
        .key_by(Selector()) \
        .time_window(milliseconds(50)) \
        .reduce(Sum()) \
        .output()
    env.execute()
```



**笔记:**

*   在多节点群集上执行需要共享介质存储,需要预先配置(.eg HDFS)。
*   给定脚本的输出定向到标准输出。因此,输出将写入相应的工作`.out`文件。如果脚本在IntelliJ IDE中执行,则输出将显示在控制台选项卡中。

## 程序框架

正如我们在示例中看到的那样,Flink流程序看起来像普通的Python程序。每个程序包含相同的基本部分:

1.  一个`main(factory)`函数定义,带有一个环境工厂参数 - 程序入口点,
2.  `Environment`从工厂获得,
3.  加载/创建初始数据,
4.  指定此数据的转换,
5.  指定计算结果的放置位置,和
6.  执行你的程序。

我们现在将概述每个步骤,但请参阅相应部分以获取更多详细信息。

`main(factory)`函数是必须的,Flink执行层使用它来运行给定的Python流程序。

`Environment`是所有Flink计划的基础。您可以使用工厂提供的工厂方法获得一个:



```
factory.get_execution_environment()
```



为了指定数据源,流运行环境有几种方法。要将文本文件作为一系列行读取,您可以使用:



```
env = factory.get_execution_environment()
text = env.read_text_file("file:///path/to/file")
```



这将为您提供一个DataStream,然后您可以在其上应用转换。有关数据源和输入格式的更多信息,请参阅 [数据源](#data-sources)

拥有DataStream后,您可以应用转换来创建新的DataStream,然后可以将其写入文件,再次转换或与其他DataStream结合使用。您可以通过使用自己的自定义转换函数调用DataStream上的方法来应用转换。例如,Map转换如下所示:



```
class Doubler(MapFunction):
    def map(self, value):
        return value * 2

data.map(Doubler())
```



这将通过将原始DataStream中的每个值加倍来创建新的DataStream。有关更多信息和所有转换的列表,请参阅[转换](#transformations)

一旦有了需要写入磁盘的DataStream,就可以在DataStream上调用其中一个方法:



```
data.write_as_text("<file-path>")
data.write_as_text("<file-path>", mode=WriteMode.OVERWRITE)
data.output()
```



最后一种方法仅对本地机器上的开发/调试有用,它会将DataSet的内容输出到标准输出。(请注意,在集群中,结果将转到集群节点的标准输出流,最终会出现在工作程序的_.out_文件中)。前两个顾名思义。有关写入文件的更多信息,请参阅[数据接收器](#data-sinks)

一旦您指定的完整程序,你需要调用`execute``Environment`。这将在本地计算机上执行或提交程序以在群集上执行,具体取决于Flink的启动方式。

## 项目设置

除了设置Flink外,无需额外的工作。使用Jython执行Python脚本意味着不需要外部包,程序就像是一个jar文件一样执行。

Python API在Windows / Linux / OSX系统上进行了测试。

## 懒惰的评价

所有Flink程序都是懒惰地执行:当执行程序的main方法时,数据加载和转换不会直接发生。而是创建每个 算子操作并将其添加到程序的计划中。当`execute()`在Environment对象上调用其中一个方法时,实际执行这些 算子操作。程序是在本地执行还是在集群上执行取决于程序的环境。

懒惰的评估使您可以构建Flink作为一个整体计划单元执行的复杂程序。

## 转换

数据转换将一个或多个DataStream转换为新的DataStream。程序可以将多个转换组合到复杂的程序集中。

本节简要概述了可用的转换。该[转换文档](dataset_transformations.html)与示例全部转换的完整描述。

W
wizardforcel 已提交
170 171 172 173 174
---

转换:**Map** PythonDataStream→PythonDataStream

描述:采用一个数据元并生成一个数据元。
W
wizardforcel 已提交
175 176 177 178 179 180 181 182 183 184 185 186


```
class Doubler(MapFunction):
    def map(self, value):
        return value * 2

data_stream.map(Doubler())
```



W
wizardforcel 已提交
187 188 189 190 191 192
---

转换:**FlatMap** PythonDataStream→PythonDataStream

描述:采用一个数据元并生成零个,一个或多个数据元。

W
wizardforcel 已提交
193 194 195 196 197 198 199 200 201 202 203

```
class Tokenizer(FlatMapFunction):
    def flatMap(self, word, collector):
        collector.collect((1, word))

data_stream.flat_map(Tokenizer())
```



W
wizardforcel 已提交
204 205 206 207 208 209
---

转换:**Filter** PythonDataStream→PythonDataStream

描述:计算每个数据元的布尔函数,并保存函数返回true的数据元。

W
wizardforcel 已提交
210 211 212 213 214 215 216 217 218 219 220

```
class GreaterThen1000(FilterFunction):
    def filter(self, value):
        return value &gt; 1000

data_stream.filter(GreaterThen1000())
```



W
wizardforcel 已提交
221 222 223 224 225 226
---

转换:**KeyBy** PythonDataStream→PythonKeyedStream

描述:逻辑上将流分区为不相交的分区,每个分区包含相同Keys的数据元。在内部,这是通过散列分区实现的。见[键](/dev/api_concepts#specifying-keys)如何指定键。此转换返回PythonKeyedDataStream。

W
wizardforcel 已提交
227 228 229 230 231 232 233 234 235 236 237

```
class Selector(KeySelector):
    def getKey(self, input):
        return input[1]  # Key by the second element in a tuple

data_stream.key_by(Selector()) // Key by field "someKey" 
```



W
wizardforcel 已提交
238 239 240 241 242 243
---

转换:**Reduce** PythonKeyedStream→PythonDataStream

描述:被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值。

W
wizardforcel 已提交
244 245 246 247 248 249 250 251 252 253 254 255 256

```
class Sum(ReduceFunction):
    def reduce(self, input1, input2):
        count1, val1 = input1
        count2, val2 = input2
        return (count1 + count2, val1)

data.reduce(Sum())
```



W
wizardforcel 已提交
257 258 259 260 261 262
---

转换:**Window** PythonKeyedStream→PythonWindowedStream

描述:可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。有关[窗口](windows.html)的完整说明,请参见windows。

W
wizardforcel 已提交
263 264 265 266 267 268 269 270 271 272 273

```
keyed_stream.count_window(10, 5)  # Last 10 elements, sliding (jumping) by 5 elements

keyed_stream.time_window(milliseconds(30))  # Last 30 milliseconds of data

keted_stream.time_window(milliseconds(100), milliseconds(20))  # Last 100 milliseconds of data, sliding (jumping) by 20 milliseconds 
```



W
wizardforcel 已提交
274 275 276 277 278 279
---

转换:**Window Apply** PythonWindowedStream→PythonDataStream

描述:将一般函数应用于整个窗口。下面是一个手动求和窗口数据元的函数。

W
wizardforcel 已提交
280 281 282 283 284 285 286 287 288 289 290 291 292 293

```
class WindowSum(WindowFunction):
    def apply(self, key, window, values, collector):
        sum = 0
        for value in values:
            sum += value[0]
        collector.collect((key, sum))

windowed_stream.apply(WindowSum()) 
```



W
wizardforcel 已提交
294 295 296 297 298 299
---

转换:**Window Reduce** PythonWindowedStream→PythonDataStream

描述:将函数缩减函数应用于窗口并返回缩小的值。

W
wizardforcel 已提交
300 301 302 303 304 305 306 307 308 309 310 311 312

```
class Sum(ReduceFunction):
    def reduce(self, input1, input2):
        count1, val1 = input1
        count2, val2 = input2
        return (count1 + count2, val1)

windowed_stream.reduce(Sum()) 
```



W
wizardforcel 已提交
313 314 315 316 317 318
---

转换:**Union** PythonDataStream *→PythonDataStream

描述:两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元。

W
wizardforcel 已提交
319 320 321 322 323 324 325

```
data_stream.union(other_stream1, other_stream2, ...); 
```



W
wizardforcel 已提交
326 327 328 329 330 331
---

转换:**Split** PythonDataStream→PythonSplitStream

描述:根据某些标准将流拆分为两个或更多个流。

W
wizardforcel 已提交
332 333 334 335 336 337 338 339 340 341 342

```
class StreamSelector(OutputSelector):
    def select(self, value):
        return ["even"] if value % 2 == 0 else ["odd"]

splited_stream = data_stream.split(StreamSelector()) 
```



W
wizardforcel 已提交
343 344 345 346 347 348
---

转换:**Select** SplitStream→DataStream

描述:从拆分流中选择一个或多个流。

W
wizardforcel 已提交
349 350 351 352 353 354 355 356 357

```
even_data_stream = splited_stream.select("even")
odd_data_stream = splited_stream.select("odd")
all_data_stream = splited_stream.select("even", "odd") 
```



W
wizardforcel 已提交
358 359 360 361 362 363
---

转换:**Iterate** PythonDataStream→PythonIterativeStream→PythonDataStream

描述:通过将一个 算子的输出重定向到某个先前的 算子,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于0的数据元将被发送回反馈通道,其余数据元将向下游转发。有关完整说明,请参阅[迭代](#iterations)

W
wizardforcel 已提交
364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385

```
class MinusOne(MapFunction):
    def map(self, value):
        return value - 1

class PositiveNumber(FilterFunction):
    def filter(self, value):
        return value &gt; 0

class LessEquelToZero(FilterFunction):
    def filter(self, value):
        return value &lt;= 0

iteration = initial_stream.iterate(5000)
iteration_body = iteration.map(MinusOne())
feedback = iteration_body.filter(PositiveNumber())
iteration.close_with(feedback)
output = iteration_body.filter(LessEquelToZero()) 
```


W
wizardforcel 已提交
386

W
wizardforcel 已提交
387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574

## 将函数传递给Flink

某些 算子操作需要用户定义的函数作为参数。所有函数都应该定义为派生自相关Flink函数的Python类。用户定义的函数被序列化并发送到TaskManagers以供执行。



```
class Filter(FilterFunction):
    def filter(self, value):
        return value > 5

data_stream.filter(Filter())
```



丰富的函数(.eg `RichFilterFunction`)允许定义(覆盖)可选 算子操作:`open``close`。用户可以使用这些函数进行初始化和清理。



```
class Tokenizer(RichMapFunction):
    def open(self, config):
        pass
    def close(self):
        pass
    def map(self, value):
        pass

data_stream.map(Tokenizer())
```



`open`在启动流式传输管道之前,Worker会调用该函数。`close`在流管道停止后,Worker调用该函数。

## 数据类型

Flink的Python Streaming API支持原始Python类型(int,float,bool,string),以及字节数组和用户定义的类。



```
class Person:
    def __init__(self, name, age):
        self.name = name
        self.age = age

class Tokenizer(MapFunction):
    def map(self, value):
        return (1, Person(*value))

data_stream.map(Tokenizer())
```



#### 元组/列表

您可以将元组(或列表)用于复合类型。Python元组映射到Jython本机对应类型,这些类型由Python打包器薄层处理。



```
word_counts = env.from_elements(("hello", 1), ("world",2))

class Tokenizer(MapFunction):
    def map(self, value):
        return value[1]

counts = word_counts.map(Tokenizer())
```



## 数据源

数据源创建初始数据流,例如来自文件或集合。

基于文件的:

*   `read_text_file(path)` - 按行读取文件并将其作为字符串流返回。

基于集合:

*   `from_elements(*args)` - 从所有数据元创建数据流。
*   `generate_sequence(from, to)` - 并行生成给定间隔中的数字序列。

**例子**



```
env  = factory.get_execution_environment()

\# read text file from local files system
localLiens = env.read_text("file:///path/to/my/textfile")

\# read text file from a HDFS running at nnHost:nnPort
hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")

\# create a set from some given elements
values = env.from_elements("Foo", "bar", "foobar", "fubar")

\# generate a number sequence
numbers = env.generate_sequence(1, 10000000)
```



## 数据接收

数据接收器使用DataStream并用于存储或返回它们:

*   `write_as_text()` - 按字符串顺序写入数据元。通过调用每个数据元的_str()_方法获得字符串。
*   `output()`- 打印标准输出上每个数据元的_str()_值。
*   `write_to_socket()` - 将DataStream作为字节数组写入套接字[host:port]。

可以将DataStream输入到多个 算子操作。程序可以编写或打印数据流,同时对它们执行其他转换。

**例子**

标准数据接收方法:



```
 write DataStream to a file on the local file system
textData.write_as_text("file:///my/result/on/localFS")

 write DataStream to a file on a HDFS with a namenode running at nnHost:nnPort
textData.write_as_text("hdfs://nnHost:nnPort/my/result/on/localFS")

 write DataStream to a file and overwrite the file if it exists
textData.write_as_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)

 this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
values.write_as_text("file:///path/to/the/result/file")
```



## 并行执行

本节介绍如何在Flink中配置程序的并行执行。Flink程序由多个任务( 算子,数据源和接收器)组成。任务被分成几个并行实例以供执行,每个并行实例处理任务输入数据的子集。任务的并行实例数称为_并行__度_或_并行__度(DOP)_。

可以在不同级别的Flink中指定任务的并行度。

### 运行环境级别

Flink程序在[运行环境](#program-skeleton)的上下文中[执行](#program-skeleton)。运行环境为其执行的所有算子,数据源和数据接收器定义默认并行性。可以通过显式配置 算子的并行性来覆盖运行环境并行性。

可以通过调用`set_parallelism()`方法来指定运行环境的默认并行性 。要以并行方式执行[WordCount](#example-program)示例程序的所有 算子,数据源和数据接收器,请 `3`按如下方式设置运行环境的默认并行度:



```
env = factory.get_execution_environment()
env.set_parallelism(3)

text.flat_map(Tokenizer()) \
    .key_by(Selector()) \
    .time_window(milliseconds(30)) \
    .reduce(Sum()) \
    .print()

env.execute()
```



### 系统级别

可以通过设置`parallelism.default`属性来定义所有运行环境的系统范围默认并行度 `./conf/flink-conf.yaml`。有关详细信息,请参阅 [配置](https://flink.sojb.cn/ops/config.html)文档

## 执行计划

要使用Flink运行计划,请转到Flink分发,然后从/ bin文件夹运行pyflink-stream.sh脚本。包含该计划的脚本必须作为第一个参数传递,然后是许多其他Python包,最后由`-`将被提供给脚本的其他参数分隔。



```
./bin/pyflink-stream.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]
```