# 并行执行 > 译者:[flink.sojb.cn](https://flink.sojb.cn/) 本节介绍如何在Flink中配置程序的并行执行。Flink程序由多个任务(转换/ 算子,数据源和接收器)组成。任务被分成几个并行实例以供执行,每个并行实例处理任务输入数据的子集。任务的并行实例数称为_并行性_。 如果要使用[保存点](https://flink.sojb.cn/ops/state/savepoints.html),还应考虑设置最大并行度(或`max parallelism`)。从保存点恢复时,您可以更改特定 算子或整个程序的并行度,此设置指定并行度的上限。这是必需的,因为Flink在内部将状态划分为Keys组,并且我们不能拥有`+Inf`多个Keys组,因为这会对性能产生不利影响。 ## 设置并行性 可以在不同级别的Flink中指定任务的并行性: ### 算子级别 可以通过调用其`setParallelism()`方法来定义单个 算子,数据源或数据接收器的并行性 。例如,像这样: * [**Java**](#tab_java_0) * [**Scala**](#tab_scala_0) ``` final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream text = [...] DataStream> wordCounts = text .flatMap(new LineSplitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).setParallelism(5); wordCounts.print(); env.execute("Word Count Example"); ``` ``` val env = StreamExecutionEnvironment.getExecutionEnvironment val text = [...] val wordCounts = text .flatMap{ _.split(" ") map { (_, 1) } } .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).setParallelism(5) wordCounts.print() env.execute("Word Count Example") ``` ### 运行环境级别 如此处所述[,](https://flink.sojb.cn/dev/api_concepts.html#anatomy-of-a-flink-program) Flink程序在运行环境的上下文中执行。运行环境为其执行的所有算子,数据源和数据接收器定义默认并行性。可以通过显式配置 算子的并行性来覆盖运行环境并行性。 可以通过调用`setParallelism()`方法来指定运行环境的默认并行性 。要以并行方式执行所有 算子,数据源和数据接收器,请`3`按如下方式设置运行环境的默认并行度: * [**Java**](#tab_java_1) * [**Scala**](#tab_scala_1) ``` final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3); DataStream text = [...] DataStream> wordCounts = [...] wordCounts.print(); env.execute("Word Count Example"); ``` ``` val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(3) val text = [...] val wordCounts = text .flatMap{ _.split(" ") map { (_, 1) } } .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) wordCounts.print() env.execute("Word Count Example") ``` ### 客户级别 在向Flink提交作业时,可以在客户端设置并行性。客户端可以是Java或Scala程序。这种客户端的一个例子是Flink的命令行界面(CLI)。 对于CLI客户端,可以使用指定parallelism参数`-p`。例如: ``` ./bin/flink run -p 10 ../examples/*WordCount-java*.jar ``` 在Java / Scala程序中,并行性设置如下: * [**Java**](#tab_java_2) * [**Scala**](#tab_scala_2) ``` try { PackagedProgram program = new PackagedProgram(file, args); InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123"); Configuration config = new Configuration(); Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader()); // set the parallelism to 10 here client.run(program, 10, true); } catch (ProgramInvocationException e) { e.printStackTrace(); } ``` ``` try { PackagedProgram program = new PackagedProgram(file, args) InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123") Configuration config = new Configuration() Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader()) // set the parallelism to 10 here client.run(program, 10, true) } catch { case e: Exception => e.printStackTrace } ``` ### 系统级别 可以通过设置`parallelism.default`属性来定义所有运行环境的系统范围默认并行度 `./conf/flink-conf.yaml`。有关详细信息,请参阅 [配置](https://flink.sojb.cn/ops/config.html)文档 ## 设置最大并行度 可以在可以设置并行度的位置设置最大并行度(客户端级别和系统级别除外)。而不是调用,`setParallelism()`你调用 `setMaxParallelism()`设置最大并行度。 最大并行度的默认设置大致`operatorParallelism + (operatorParallelism / 2)`为下限`127`和上限`32768`。 注意将最大并行度设置为非常大的值可能对性能有害,因为某些状态后台必须保持内部数据结构随Keys组的数量(这是可重新缓存状态的内部实现机制)进行扩展。