70.md 4.9 KB
Newer Older
W
wizardforcel 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169


# 并行执行

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)


本节介绍如何在Flink中配置程序的并行执行。Flink程序由多个任务(转换/ 算子,数据源和接收器)组成。任务被分成几个并行实例以供执行,每个并行实例处理任务输入数据的子集。任务的并行实例数称为_并行性_。

如果要使用[保存点](https://flink.sojb.cn/ops/state/savepoints.html),还应考虑设置最大并行度(或`max parallelism`)。从保存点恢复时,您可以更改特定 算子或整个程序的并行度,此设置指定并行度的上限。这是必需的,因为Flink在内部将状态划分为Keys组,并且我们不能拥有`+Inf`多个Keys组,因为这会对性能产生不利影响。

## 设置并行性

可以在不同级别的Flink中指定任务的并行性:

### 算子级别

可以通过调用其`setParallelism()`方法来定义单个 算子,数据源或数据接收器的并行性 。例如,像这样:

*   [**Java**](#tab_java_0)
*   [**Scala**](#tab_scala_0)



```
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");
```





```
val env = StreamExecutionEnvironment.getExecutionEnvironment

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1).setParallelism(5)
wordCounts.print()

env.execute("Word Count Example")
```



### 运行环境级别

如此处所述[](https://flink.sojb.cn/dev/api_concepts.html#anatomy-of-a-flink-program) Flink程序在运行环境的上下文中执行。运行环境为其执行的所有算子,数据源和数据接收器定义默认并行性。可以通过显式配置 算子的并行性来覆盖运行环境并行性。

可以通过调用`setParallelism()`方法来指定运行环境的默认并行性 。要以并行方式执行所有 算子,数据源和数据接收器,请`3`按如下方式设置运行环境的默认并行度:

*   [**Java**](#tab_java_1)
*   [**Scala**](#tab_scala_1)



```
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");
```





```
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)

val text = [...]
val wordCounts = text
    .flatMap{ _.split(" ") map { (_, 1) } }
    .keyBy(0)
    .timeWindow(Time.seconds(5))
    .sum(1)
wordCounts.print()

env.execute("Word Count Example")
```



### 客户级别

在向Flink提交作业时,可以在客户端设置并行性。客户端可以是Java或Scala程序。这种客户端的一个例子是Flink的命令行界面(CLI)。

对于CLI客户端,可以使用指定parallelism参数`-p`。例如:

```
./bin/flink run -p 10 ../examples/*WordCount-java*.jar 
```

在Java / Scala程序中,并行性设置如下:

*   [**Java**](#tab_java_2)
*   [**Scala**](#tab_scala_2)



```
try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}
```





```
try {
    PackagedProgram program = new PackagedProgram(file, args)
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123")
    Configuration config = new Configuration()

    Client client = new Client(jobManagerAddress, new Configuration(), program.getUserCodeClassLoader())

    // set the parallelism to 10 here
    client.run(program, 10, true)

} catch {
    case e: Exception => e.printStackTrace
}
```



### 系统级别

可以通过设置`parallelism.default`属性来定义所有运行环境的系统范围默认并行度 `./conf/flink-conf.yaml`。有关详细信息,请参阅 [配置](https://flink.sojb.cn/ops/config.html)文档

## 设置最大并行度

可以在可以设置并行度的位置设置最大并行度(客户端级别和系统级别除外)。而不是调用,`setParallelism()`你调用 `setMaxParallelism()`设置最大并行度。

最大并行度的默认设置大致`operatorParallelism + (operatorParallelism / 2)`为下限`127`和上限`32768`

注意将最大并行度设置为非常大的值可能对性能有害,因为某些状态后台必须保持内部数据结构随Keys组的数量(这是可重新缓存状态的内部实现机制)进行扩展。