# Python编程指南(流)Beta > 译者:[flink.sojb.cn](https://flink.sojb.cn/) Flink中的分析流程序是实现流数据集转换(例如,Filter,映射,Join,分组)的常规程序。流数据集最初是从某些源创建的(例如,通过读取文件或从集合中创建)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink流处理程序可在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。 为了创建自己的Flink流处理程序,我们建议您从[程序框架](#program-skeleton)开始, 逐步添加自己的 [转换](#transformations)。其余部分充当其他 算子操作和高级函数的参考。 ## Jython框架 Flink Python流API使用Jython框架(请参阅[http://www.jython.org/archive/21/docs/whatis.html](http://www.jython.org/archive/21/docs/whatis.html))来驱动给定脚本的执行。Python流层实际上是现有Java流API的薄打包层。 #### 约束 使用Jython有两个主要限制: * 最新的Python支持版本是2.7 * 使用Python C扩展并不简单 ## 流程序示例 以下流处理程序是WordCount的完整工作示例。您可以复制并粘贴代码以在本地运行它(请参阅本节后面的注释)。它计算句子流中每个单词的数量(不区分大小写),窗口大小为50毫秒,并将结果打印到标准输出中。 ``` from org.apache.flink.streaming.api.functions.source import SourceFunction from org.apache.flink.api.common.functions import FlatMapFunction, ReduceFunction from org.apache.flink.api.java.functions import KeySelector from org.apache.flink.streaming.api.windowing.time.Time import milliseconds class Generator(SourceFunction): def __init__(self, num_iters): self._running = True self._num_iters = num_iters def run(self, ctx): counter = 0 while self._running and counter < self._num_iters: ctx.collect('Hello World') counter += 1 def cancel(self): self._running = False class Tokenizer(FlatMapFunction): def flatMap(self, value, collector): for word in value.lower().split(): collector.collect((1, word)) class Selector(KeySelector): def getKey(self, input): return input[1] class Sum(ReduceFunction): def reduce(self, input1, input2): count1, word1 = input1 count2, word2 = input2 return (count1 + count2, word1) def main(factory): env = factory.get_execution_environment() env.create_python_source(Generator(num_iters=1000)) \ .flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(milliseconds(50)) \ .reduce(Sum()) \ .output() env.execute() ``` **笔记:** * 在多节点群集上执行需要共享介质存储,需要预先配置(.eg HDFS)。 * 给定脚本的输出定向到标准输出。因此,输出将写入相应的工作`.out`文件。如果脚本在IntelliJ IDE中执行,则输出将显示在控制台选项卡中。 ## 程序框架 正如我们在示例中看到的那样,Flink流程序看起来像普通的Python程序。每个程序包含相同的基本部分: 1. 一个`main(factory)`函数定义,带有一个环境工厂参数 - 程序入口点, 2. `Environment`从工厂获得, 3. 加载/创建初始数据, 4. 指定此数据的转换, 5. 指定计算结果的放置位置,和 6. 执行你的程序。 我们现在将概述每个步骤,但请参阅相应部分以获取更多详细信息。 该`main(factory)`函数是必须的,Flink执行层使用它来运行给定的Python流程序。 这`Environment`是所有Flink计划的基础。您可以使用工厂提供的工厂方法获得一个: ``` factory.get_execution_environment() ``` 为了指定数据源,流运行环境有几种方法。要将文本文件作为一系列行读取,您可以使用: ``` env = factory.get_execution_environment() text = env.read_text_file("file:///path/to/file") ``` 这将为您提供一个DataStream,然后您可以在其上应用转换。有关数据源和输入格式的更多信息,请参阅 [数据源](#data-sources)。 拥有DataStream后,您可以应用转换来创建新的DataStream,然后可以将其写入文件,再次转换或与其他DataStream结合使用。您可以通过使用自己的自定义转换函数调用DataStream上的方法来应用转换。例如,Map转换如下所示: ``` class Doubler(MapFunction): def map(self, value): return value * 2 data.map(Doubler()) ``` 这将通过将原始DataStream中的每个值加倍来创建新的DataStream。有关更多信息和所有转换的列表,请参阅[转换](#transformations)。 一旦有了需要写入磁盘的DataStream,就可以在DataStream上调用其中一个方法: ``` data.write_as_text("") data.write_as_text("", mode=WriteMode.OVERWRITE) data.output() ``` 最后一种方法仅对本地机器上的开发/调试有用,它会将DataSet的内容输出到标准输出。(请注意,在集群中,结果将转到集群节点的标准输出流,最终会出现在工作程序的_.out_文件中)。前两个顾名思义。有关写入文件的更多信息,请参阅[数据接收器](#data-sinks)。 一旦您指定的完整程序,你需要调用`execute`的`Environment`。这将在本地计算机上执行或提交程序以在群集上执行,具体取决于Flink的启动方式。 ## 项目设置 除了设置Flink外,无需额外的工作。使用Jython执行Python脚本意味着不需要外部包,程序就像是一个jar文件一样执行。 Python API在Windows / Linux / OSX系统上进行了测试。 ## 懒惰的评价 所有Flink程序都是懒惰地执行:当执行程序的main方法时,数据加载和转换不会直接发生。而是创建每个 算子操作并将其添加到程序的计划中。当`execute()`在Environment对象上调用其中一个方法时,实际执行这些 算子操作。程序是在本地执行还是在集群上执行取决于程序的环境。 懒惰的评估使您可以构建Flink作为一个整体计划单元执行的复杂程序。 ## 转换 数据转换将一个或多个DataStream转换为新的DataStream。程序可以将多个转换组合到复杂的程序集中。 本节简要概述了可用的转换。该[转换文档](dataset_transformations.html)与示例全部转换的完整描述。 --- 转换:**Map** PythonDataStream→PythonDataStream 描述:采用一个数据元并生成一个数据元。 ``` class Doubler(MapFunction): def map(self, value): return value * 2 data_stream.map(Doubler()) ``` --- 转换:**FlatMap** PythonDataStream→PythonDataStream 描述:采用一个数据元并生成零个,一个或多个数据元。 ``` class Tokenizer(FlatMapFunction): def flatMap(self, word, collector): collector.collect((1, word)) data_stream.flat_map(Tokenizer()) ``` --- 转换:**Filter** PythonDataStream→PythonDataStream 描述:计算每个数据元的布尔函数,并保存函数返回true的数据元。 ``` class GreaterThen1000(FilterFunction): def filter(self, value): return value > 1000 data_stream.filter(GreaterThen1000()) ``` --- 转换:**KeyBy** PythonDataStream→PythonKeyedStream 描述:逻辑上将流分区为不相交的分区,每个分区包含相同Keys的数据元。在内部,这是通过散列分区实现的。见[键](/dev/api_concepts#specifying-keys)如何指定键。此转换返回PythonKeyedDataStream。 ``` class Selector(KeySelector): def getKey(self, input): return input[1] # Key by the second element in a tuple data_stream.key_by(Selector()) // Key by field "someKey" ``` --- 转换:**Reduce** PythonKeyedStream→PythonDataStream 描述:被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值。 ``` class Sum(ReduceFunction): def reduce(self, input1, input2): count1, val1 = input1 count2, val2 = input2 return (count1 + count2, val1) data.reduce(Sum()) ``` --- 转换:**Window** PythonKeyedStream→PythonWindowedStream 描述:可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。有关[窗口](windows.html)的完整说明,请参见windows。 ``` keyed_stream.count_window(10, 5) # Last 10 elements, sliding (jumping) by 5 elements keyed_stream.time_window(milliseconds(30)) # Last 30 milliseconds of data keted_stream.time_window(milliseconds(100), milliseconds(20)) # Last 100 milliseconds of data, sliding (jumping) by 20 milliseconds ``` --- 转换:**Window Apply** PythonWindowedStream→PythonDataStream 描述:将一般函数应用于整个窗口。下面是一个手动求和窗口数据元的函数。 ``` class WindowSum(WindowFunction): def apply(self, key, window, values, collector): sum = 0 for value in values: sum += value[0] collector.collect((key, sum)) windowed_stream.apply(WindowSum()) ``` --- 转换:**Window Reduce** PythonWindowedStream→PythonDataStream 描述:将函数缩减函数应用于窗口并返回缩小的值。 ``` class Sum(ReduceFunction): def reduce(self, input1, input2): count1, val1 = input1 count2, val2 = input2 return (count1 + count2, val1) windowed_stream.reduce(Sum()) ``` --- 转换:**Union** PythonDataStream *→PythonDataStream 描述:两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元。 ``` data_stream.union(other_stream1, other_stream2, ...); ``` --- 转换:**Split** PythonDataStream→PythonSplitStream 描述:根据某些标准将流拆分为两个或更多个流。 ``` class StreamSelector(OutputSelector): def select(self, value): return ["even"] if value % 2 == 0 else ["odd"] splited_stream = data_stream.split(StreamSelector()) ``` --- 转换:**Select** SplitStream→DataStream 描述:从拆分流中选择一个或多个流。 ``` even_data_stream = splited_stream.select("even") odd_data_stream = splited_stream.select("odd") all_data_stream = splited_stream.select("even", "odd") ``` --- 转换:**Iterate** PythonDataStream→PythonIterativeStream→PythonDataStream 描述:通过将一个 算子的输出重定向到某个先前的 算子,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于0的数据元将被发送回反馈通道,其余数据元将向下游转发。有关完整说明,请参阅[迭代](#iterations)。 ``` class MinusOne(MapFunction): def map(self, value): return value - 1 class PositiveNumber(FilterFunction): def filter(self, value): return value > 0 class LessEquelToZero(FilterFunction): def filter(self, value): return value <= 0 iteration = initial_stream.iterate(5000) iteration_body = iteration.map(MinusOne()) feedback = iteration_body.filter(PositiveNumber()) iteration.close_with(feedback) output = iteration_body.filter(LessEquelToZero()) ``` ## 将函数传递给Flink 某些 算子操作需要用户定义的函数作为参数。所有函数都应该定义为派生自相关Flink函数的Python类。用户定义的函数被序列化并发送到TaskManagers以供执行。 ``` class Filter(FilterFunction): def filter(self, value): return value > 5 data_stream.filter(Filter()) ``` 丰富的函数(.eg `RichFilterFunction`)允许定义(覆盖)可选 算子操作:`open`&`close`。用户可以使用这些函数进行初始化和清理。 ``` class Tokenizer(RichMapFunction): def open(self, config): pass def close(self): pass def map(self, value): pass data_stream.map(Tokenizer()) ``` `open`在启动流式传输管道之前,Worker会调用该函数。`close`在流管道停止后,Worker调用该函数。 ## 数据类型 Flink的Python Streaming API支持原始Python类型(int,float,bool,string),以及字节数组和用户定义的类。 ``` class Person: def __init__(self, name, age): self.name = name self.age = age class Tokenizer(MapFunction): def map(self, value): return (1, Person(*value)) data_stream.map(Tokenizer()) ``` #### 元组/列表 您可以将元组(或列表)用于复合类型。Python元组映射到Jython本机对应类型,这些类型由Python打包器薄层处理。 ``` word_counts = env.from_elements(("hello", 1), ("world",2)) class Tokenizer(MapFunction): def map(self, value): return value[1] counts = word_counts.map(Tokenizer()) ``` ## 数据源 数据源创建初始数据流,例如来自文件或集合。 基于文件的: * `read_text_file(path)` - 按行读取文件并将其作为字符串流返回。 基于集合: * `from_elements(*args)` - 从所有数据元创建数据流。 * `generate_sequence(from, to)` - 并行生成给定间隔中的数字序列。 **例子** ``` env = factory.get_execution_environment() \# read text file from local files system localLiens = env.read_text("file:///path/to/my/textfile") \# read text file from a HDFS running at nnHost:nnPort hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile") \# create a set from some given elements values = env.from_elements("Foo", "bar", "foobar", "fubar") \# generate a number sequence numbers = env.generate_sequence(1, 10000000) ``` ## 数据接收 数据接收器使用DataStream并用于存储或返回它们: * `write_as_text()` - 按字符串顺序写入数据元。通过调用每个数据元的_str()_方法获得字符串。 * `output()`- 打印标准输出上每个数据元的_str()_值。 * `write_to_socket()` - 将DataStream作为字节数组写入套接字[host:port]。 可以将DataStream输入到多个 算子操作。程序可以编写或打印数据流,同时对它们执行其他转换。 **例子** 标准数据接收方法: ``` write DataStream to a file on the local file system textData.write_as_text("file:///my/result/on/localFS") write DataStream to a file on a HDFS with a namenode running at nnHost:nnPort textData.write_as_text("hdfs://nnHost:nnPort/my/result/on/localFS") write DataStream to a file and overwrite the file if it exists textData.write_as_text("file:///my/result/on/localFS", WriteMode.OVERWRITE) this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines values.write_as_text("file:///path/to/the/result/file") ``` ## 并行执行 本节介绍如何在Flink中配置程序的并行执行。Flink程序由多个任务( 算子,数据源和接收器)组成。任务被分成几个并行实例以供执行,每个并行实例处理任务输入数据的子集。任务的并行实例数称为_并行__度_或_并行__度(DOP)_。 可以在不同级别的Flink中指定任务的并行度。 ### 运行环境级别 Flink程序在[运行环境](#program-skeleton)的上下文中[执行](#program-skeleton)。运行环境为其执行的所有算子,数据源和数据接收器定义默认并行性。可以通过显式配置 算子的并行性来覆盖运行环境并行性。 可以通过调用`set_parallelism()`方法来指定运行环境的默认并行性 。要以并行方式执行[WordCount](#example-program)示例程序的所有 算子,数据源和数据接收器,请 `3`按如下方式设置运行环境的默认并行度: ``` env = factory.get_execution_environment() env.set_parallelism(3) text.flat_map(Tokenizer()) \ .key_by(Selector()) \ .time_window(milliseconds(30)) \ .reduce(Sum()) \ .print() env.execute() ``` ### 系统级别 可以通过设置`parallelism.default`属性来定义所有运行环境的系统范围默认并行度 `./conf/flink-conf.yaml`。有关详细信息,请参阅 [配置](https://flink.sojb.cn/ops/config.html)文档 ## 执行计划 要使用Flink运行计划,请转到Flink分发,然后从/ bin文件夹运行pyflink-stream.sh脚本。包含该计划的脚本必须作为第一个参数传递,然后是许多其他Python包,最后由`-`将被提供给脚本的其他参数分隔。 ``` ./bin/pyflink-stream.sh