26.md 28.1 KB
Newer Older
W
wizardforcel 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20


# 算子

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)


算子将一个或多个DataStream转换为新的DataStream。程序可以将多个转换组合成复杂的数据流拓扑。

本节介绍了基本转换,应用这些转换后的有效物理分区以及对Flink 算子链接的见解。

# DataStream转换

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)


*   [**Java**](#tab_java_0)
*   [**Scala**](#tab_scala_0)


W
wizardforcel 已提交
21 22
---

W
wizardforcel 已提交
23
转换:**映射** DataStream→DataStream
W
wizardforcel 已提交
24 25 26

描述:采用一个数据元并生成一个数据元。一个map函数,它将输入流的值加倍:

W
wizardforcel 已提交
27 28 29 30 31 32 33 34 35 36 37 38 39

```
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
    @Override
    public Integer map(Integer value) throws Exception {
        return 2 * value;
    }
}); 
```



W
wizardforcel 已提交
40 41
---

W
wizardforcel 已提交
42
转换:**FlatMap** DataStream→DataStream
W
wizardforcel 已提交
43 44 45

描述:采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的flatmap函数:

W
wizardforcel 已提交
46 47 48 49 50 51 52 53 54 55 56 57 58 59 60

```
dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
        throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
}); 
```



W
wizardforcel 已提交
61 62
---

W
wizardforcel 已提交
63
转换:**Filter** DataStream→DataStream
W
wizardforcel 已提交
64 65 66

描述:计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器:

W
wizardforcel 已提交
67 68 69 70 71 72 73 74 75 76 77 78

```
dataStream.filter(new FilterFunction<Integer>() {
    @Override
    public boolean filter(Integer value) throws Exception {
        return value != 0;
    }
}); 
```



W
wizardforcel 已提交
79 80
---

W
wizardforcel 已提交
81
转换:**KeyBy** DataStream→KeyedStream
W
wizardforcel 已提交
82 83 84

描述:逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,_keyBy()_是使用散列分区实现的。[指定键](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys)有不同的方法。此转换返回_KeyedStream_,其中包括使用[被Keys化状态](https://flink.sojb.cn/dev/stream/state/state.html#keyed-state)所需的_KeyedStream_。

W
wizardforcel 已提交
85 86 87 88 89 90 91 92 93 94 95 96

```
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple 
```


注意 如果出现以下情况,则类型**不能成为关键**

1.  它是POJO类型但不覆盖_hashCode()_方法并依赖于_Object.hashCode()_实现。
2.  它是任何类型的数组。

W
wizardforcel 已提交
97 98 99

---

W
wizardforcel 已提交
100
转换:**Reduce** KeyedStream→DataStream
W
wizardforcel 已提交
101 102

描述:被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值。
W
wizardforcel 已提交
103 104 105 106 107 108 109 110 111 112 113 114 115 116 117

reduce函数,用于创建部分和的流:


```
keyedStream.reduce(new ReduceFunction<Integer>() {
    @Override
    public Integer reduce(Integer value1, Integer value2)
    throws Exception {
        return value1 + value2;
    }
}); 
```


W
wizardforcel 已提交
118 119 120

---

W
wizardforcel 已提交
121
转换:**折叠** KeyedStream→DataStream
W
wizardforcel 已提交
122 123

描述:具有初始值的被Keys化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值。
W
wizardforcel 已提交
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139

折叠函数,当应用于序列(1,2,3,4,5)时,发出序列“start-1”,“start-1-2”,“start-1-2-3”,. ..


```
DataStream<String> result =
  keyedStream.fold("start", new FoldFunction<Integer, String>() {
    @Override
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
  }); 
```



W
wizardforcel 已提交
140 141
---

W
wizardforcel 已提交
142
转换:**聚合** KeyedStream→DataStream
W
wizardforcel 已提交
143 144 145

描述:在被Keys化数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。

W
wizardforcel 已提交
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161

```
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key"); 
```



W
wizardforcel 已提交
162 163
---

W
wizardforcel 已提交
164
转换:**Window** KeyedStream→WindowedStream
W
wizardforcel 已提交
165 166 167

描述:可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。有关[窗口](windows.html)的完整说明,请参见windows。

W
wizardforcel 已提交
168 169 170 171 172 173 174

```
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data 
```



W
wizardforcel 已提交
175 176
---

W
wizardforcel 已提交
177
转换:**WindowAll** DataStream→AllWindowedStream
W
wizardforcel 已提交
178 179 180

描述:Windows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数据)对所有流事件进行分组。有关[窗口](windows.html)的完整说明,请参见windows。**警告:**在许多情况下,这**是非并行**转换。所有记录将收集在windowAll 算子的一个任务中。

W
wizardforcel 已提交
181 182 183 184 185 186 187

```
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data 
```



W
wizardforcel 已提交
188 189
---

W
wizardforcel 已提交
190
转换:**Window Apply** WindowedStream→DataStream AllWindowedStream→DataStream
W
wizardforcel 已提交
191 192 193

描述:将一般函数应用于整个窗口。下面是一个手动求和窗口数据元的函数。**注意:**如果您正在使用windowAll转换,则需要使用AllWindowFunction。

W
wizardforcel 已提交
194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224

```
windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

// applying an AllWindowFunction on non-keyed window stream
allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, Integer, Window>() {
    public void apply (Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
}); 
```



W
wizardforcel 已提交
225 226
---

W
wizardforcel 已提交
227
转换:**Window Reduce** WindowedStream→DataStream
W
wizardforcel 已提交
228 229 230

描述:将函数缩减函数应用于窗口并返回缩小的值。

W
wizardforcel 已提交
231 232 233 234 235 236 237 238 239 240 241

```
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
        return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
    }
}); 
```



W
wizardforcel 已提交
242 243
---

W
wizardforcel 已提交
244
转换:**Window Fold** WindowedStream→DataStream
W
wizardforcel 已提交
245 246 247

描述:将函数折叠函数应用于窗口并返回折叠值。示例函数应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”:

W
wizardforcel 已提交
248 249 250 251 252 253 254 255 256 257 258

```
windowedStream.fold("start", new FoldFunction<Integer, String>() {
    public String fold(String current, Integer value) {
        return current + "-" + value;
    }
}); 
```



W
wizardforcel 已提交
259 260
---

W
wizardforcel 已提交
261
转换:**Windows上的聚合** WindowedStream→DataStream
W
wizardforcel 已提交
262 263 264

描述:聚合窗口的内容。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。

W
wizardforcel 已提交
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280

```
windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key"); 
```



W
wizardforcel 已提交
281 282
---

W
wizardforcel 已提交
283
转换:**Union** DataStream *→DataStream
W
wizardforcel 已提交
284 285 286

描述:两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元。

W
wizardforcel 已提交
287 288 289 290 291 292 293

```
dataStream.union(otherStream1, otherStream2, ...); 
```



W
wizardforcel 已提交
294 295
---

W
wizardforcel 已提交
296
转换:**Window Join** DataStream,DataStream→DataStream
W
wizardforcel 已提交
297 298 299

描述:在给定Keys和公共窗口上连接两个数据流。

W
wizardforcel 已提交
300 301 302 303 304 305 306 307 308 309

```
dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...}); 
```



W
wizardforcel 已提交
310 311
---

W
wizardforcel 已提交
312
转换:**Interval Join** KeyedStream,KeyedStream→DataStream
W
wizardforcel 已提交
313 314 315

描述:在给定的时间间隔内使用公共Keys关联两个被Key化的数据流的两个数据元e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

W
wizardforcel 已提交
316 317 318 319 320 321 322 323 324 325 326 327 328

```
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...}); 
```



W
wizardforcel 已提交
329 330
---

W
wizardforcel 已提交
331
转换:**Window CoGroup** DataStream,DataStream→DataStream
W
wizardforcel 已提交
332 333 334

描述:在给定Keys和公共窗口上对两个数据流进行Cogroup。

W
wizardforcel 已提交
335 336 337 338 339 340 341 342 343 344

```
dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...}); 
```



W
wizardforcel 已提交
345 346
---

W
wizardforcel 已提交
347
转换:**连接** DataStream,DataStream→ConnectedStreams
W
wizardforcel 已提交
348 349 350

描述:“连接”两个保存其类型的数据流。连接允许两个流之间的共享状态。

W
wizardforcel 已提交
351 352 353 354 355 356 357 358 359 360

```
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...

ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream); 
```



W
wizardforcel 已提交
361 362
---

W
wizardforcel 已提交
363
转换:**CoMap,CoFlatMap** ConnectedStreams→DataStream
W
wizardforcel 已提交
364 365 366

描述:类似于连接数据流上的map和flatMap

W
wizardforcel 已提交
367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397

```
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
}); 
```



W
wizardforcel 已提交
398 399
---

W
wizardforcel 已提交
400
转换:**拆分** DataStream→SplitStream
W
wizardforcel 已提交
401 402 403

描述:根据某些标准将流拆分为两个或更多个流。

W
wizardforcel 已提交
404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422

```
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>();
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
}); 
```



W
wizardforcel 已提交
423 424
---

W
wizardforcel 已提交
425
转换:**选择** SplitStream→DataStream
W
wizardforcel 已提交
426 427 428

描述:从拆分流中选择一个或多个流。

W
wizardforcel 已提交
429 430 431 432 433 434 435 436 437 438

```
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd"); 
```



W
wizardforcel 已提交
439 440
---

W
wizardforcel 已提交
441
转换:**迭代** DataStream→IterativeStream→DataStream
W
wizardforcel 已提交
442 443 444

描述:通过将一个 算子的输出重定向到某个先前的 算子,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于0的数据元将被发送回反馈通道,其余数据元将向下游转发。有关完整说明,请参阅[迭代](#iterations)

W
wizardforcel 已提交
445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465

```
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Integer value) throws Exception {
        return value <= 0;
    }
}); 
```



W
wizardforcel 已提交
466 467
---

W
wizardforcel 已提交
468
转换:**提取时间戳** DataStream→DataStream
W
wizardforcel 已提交
469 470 471

描述:从记录中提取时间戳,以便使用使用事件时间语义的窗口。查看[活动时间](https://flink.sojb.cn/dev/event_time.html)。

W
wizardforcel 已提交
472 473 474 475 476 477 478 479

```
stream.assignTimestamps (new TimeStampExtractor() {...}); 
```




W
wizardforcel 已提交
480 481 482

---

W
wizardforcel 已提交
483
转换:**Map** DataStream → DataStream
W
wizardforcel 已提交
484 485 486

描述:Takes one element and produces one element. A map function that doubles the values of the input stream:

W
wizardforcel 已提交
487 488 489 490 491 492 493

```
dataStream.map { x => x * 2 } 
```



W
wizardforcel 已提交
494 495
---

W
wizardforcel 已提交
496
转换:**FlatMap** DataStream → DataStream
W
wizardforcel 已提交
497 498 499

描述:Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:

W
wizardforcel 已提交
500 501 502 503 504 505 506

```
dataStream.flatMap { str => str.split(" ") } 
```



W
wizardforcel 已提交
507 508
---

W
wizardforcel 已提交
509
转换:**Filter** DataStream → DataStream
W
wizardforcel 已提交
510 511 512

描述:Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

W
wizardforcel 已提交
513 514 515 516 517 518 519

```
dataStream.filter { _ != 0 } 
```



W
wizardforcel 已提交
520 521
---

W
wizardforcel 已提交
522
转换:**KeyBy** DataStream → KeyedStream
W
wizardforcel 已提交
523 524 525

描述:Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See [keys](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys) on how to specify keys. This transformation returns a KeyedStream.

W
wizardforcel 已提交
526 527 528 529 530 531

```
dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple 
```


W
wizardforcel 已提交
532 533 534

---

W
wizardforcel 已提交
535
转换:**Reduce** KeyedStream → DataStream
W
wizardforcel 已提交
536 537

描述:A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
W
wizardforcel 已提交
538 539 540 541 542 543 544 545 546

A reduce function that creates a stream of partial sums:


```
keyedStream.reduce { _ + _ } 
```


W
wizardforcel 已提交
547 548
---

W
wizardforcel 已提交
549
转换:**Fold** KeyedStream → DataStream
W
wizardforcel 已提交
550 551

描述:A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.
W
wizardforcel 已提交
552 553 554 555 556 557 558 559 560 561 562

A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...


```
val result: DataStream[String] =
    keyedStream.fold("start")((str, i) => { str + "-" + i }) 
```



W
wizardforcel 已提交
563 564
---

W
wizardforcel 已提交
565
转换:**Aggregations** KeyedStream → DataStream
W
wizardforcel 已提交
566 567 568

描述:Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

W
wizardforcel 已提交
569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584

```
keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key") 
```



W
wizardforcel 已提交
585 586
---

W
wizardforcel 已提交
587
转换:**Window** KeyedStream → WindowedStream
W
wizardforcel 已提交
588 589 590

描述:Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See [windows](windows.html) for a description of windows.

W
wizardforcel 已提交
591 592 593 594 595 596 597

```
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data 
```



W
wizardforcel 已提交
598 599
---

W
wizardforcel 已提交
600
转换:**WindowAll** DataStream → AllWindowedStream
W
wizardforcel 已提交
601 602 603

描述:Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See [windows](windows.html) for a complete description of windows.**WARNING:** This is in many cases a **non-parallel** transformation. All records will be gathered in one task for the windowAll operator.

W
wizardforcel 已提交
604 605 606 607 608 609 610

```
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data 
```



W
wizardforcel 已提交
611 612
---

W
wizardforcel 已提交
613
转换:**Window Apply** WindowedStream → DataStream AllWindowedStream → DataStream | Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.**Note:** If you are using a windowAll transformation, you need to use an AllWindowFunction instead.
W
wizardforcel 已提交
614

W
wizardforcel 已提交
615 616 617 618 619 620 621 622 623

```
windowedStream.apply { WindowFunction }

// applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction } 
```



W
wizardforcel 已提交
624 625
---

W
wizardforcel 已提交
626
转换:**Window Reduce** WindowedStream → DataStream
W
wizardforcel 已提交
627 628 629

描述:Applies a functional reduce function to the window and returns the reduced value.

W
wizardforcel 已提交
630 631 632 633 634 635 636

```
windowedStream.reduce { _ + _ } 
```



W
wizardforcel 已提交
637 638
---

W
wizardforcel 已提交
639
转换:**Window Fold** WindowedStream → DataStream
W
wizardforcel 已提交
640 641 642

描述:Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5":

W
wizardforcel 已提交
643 644 645 646 647 648 649 650

```
val result: DataStream[String] =
    windowedStream.fold("start", (str, i) => { str + "-" + i }) 
```



W
wizardforcel 已提交
651 652
---

W
wizardforcel 已提交
653
转换:**Aggregations on windows** WindowedStream → DataStream
W
wizardforcel 已提交
654 655 656

描述:Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).

W
wizardforcel 已提交
657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672

```
windowedStream.sum(0)
windowedStream.sum("key")
windowedStream.min(0)
windowedStream.min("key")
windowedStream.max(0)
windowedStream.max("key")
windowedStream.minBy(0)
windowedStream.minBy("key")
windowedStream.maxBy(0)
windowedStream.maxBy("key") 
```



W
wizardforcel 已提交
673 674
---

W
wizardforcel 已提交
675
转换:**Union** DataStream* → DataStream
W
wizardforcel 已提交
676 677 678

描述:Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.

W
wizardforcel 已提交
679 680 681 682 683 684 685

```
dataStream.union(otherStream1, otherStream2, ...) 
```



W
wizardforcel 已提交
686 687
---

W
wizardforcel 已提交
688
转换:**Window Join** DataStream,DataStream → DataStream
W
wizardforcel 已提交
689 690 691

描述:Join two data streams on a given key and a common window.

W
wizardforcel 已提交
692 693 694 695 696 697 698 699 700 701

```
dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply { ... } 
```



W
wizardforcel 已提交
702 703
---

W
wizardforcel 已提交
704
转换:**Window CoGroup** DataStream,DataStream → DataStream
W
wizardforcel 已提交
705 706 707

描述:Cogroups two data streams on a given key and a common window.

W
wizardforcel 已提交
708 709 710 711 712 713 714 715 716 717

```
dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply {} 
```



W
wizardforcel 已提交
718 719
---

W
wizardforcel 已提交
720
转换:**Connect** DataStream,DataStream → ConnectedStreams
W
wizardforcel 已提交
721 722 723

描述:"Connects" two data streams retaining their types, allowing for shared state between the two streams.

W
wizardforcel 已提交
724 725 726 727 728 729 730 731 732 733

```
someStream : DataStream[Int] = ...
otherStream : DataStream[String] = ...

val connectedStreams = someStream.connect(otherStream) 
```



W
wizardforcel 已提交
734 735
---

W
wizardforcel 已提交
736
转换:**CoMap, CoFlatMap** ConnectedStreams → DataStream
W
wizardforcel 已提交
737 738 739

描述:Similar to map and flatMap on a connected data stream

W
wizardforcel 已提交
740 741 742 743 744 745 746 747 748 749 750 751 752 753

```
connectedStreams.map(
    (_ : Int) => true,
    (_ : String) => false
)
connectedStreams.flatMap(
    (_ : Int) => true,
    (_ : String) => false
) 
```



W
wizardforcel 已提交
754 755
---

W
wizardforcel 已提交
756
转换:**Split** DataStream → SplitStream
W
wizardforcel 已提交
757 758 759

描述:Split the stream into two or more streams according to some criterion.

W
wizardforcel 已提交
760 761 762 763 764 765 766 767 768 769 770 771 772

```
val split = someDataStream.split(
  (num: Int) =>
    (num % 2) match {
      case 0 => List("even")
      case 1 => List("odd")
    }
) 
```



W
wizardforcel 已提交
773 774
---

W
wizardforcel 已提交
775
转换:**Select** SplitStream → DataStream
W
wizardforcel 已提交
776 777 778

描述:Select one or more streams from a split stream.

W
wizardforcel 已提交
779 780 781 782 783 784 785 786 787

```
val even = split select "even"
val odd = split select "odd"
val all = split.select("even","odd") 
```



W
wizardforcel 已提交
788 789
---

W
wizardforcel 已提交
790
转换:**Iterate** DataStream → IterativeStream → DataStream
W
wizardforcel 已提交
791 792 793

描述:Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See [iterations](#iterations) for a complete description.

W
wizardforcel 已提交
794 795 796 797 798 799 800 801 802 803 804 805

```
initialStream.iterate {
  iteration => {
    val iterationBody = iteration.map {/*do something*/}
    (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
  }
} 
```



W
wizardforcel 已提交
806 807
---

W
wizardforcel 已提交
808
转换:**Extract Timestamps** DataStream → DataStream
W
wizardforcel 已提交
809 810 811

描述:Extracts timestamps from records in order to work with windows that use event time semantics. See [Event Time](https://flink.sojb.cn/dev/event_time.html).

W
wizardforcel 已提交
812 813 814 815 816 817

```
stream.assignTimestamps { timestampExtractor } 
```


W
wizardforcel 已提交
818

W
wizardforcel 已提交
819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837

Extraction from tuples, case classes and collections via anonymous pattern matching, like the following:



```
val data: DataStream[(Int, String, Double)] = // [...] data.map {
  case (id, name, temperature) => // [...] }
```



is not supported by the API out-of-the-box. To use this feature, you should use a [Scala API extension](https://flink.sojb.cn/dev/scala_api_extensions.html).

以下转换可用于元组的数据流:

*   [**Java**](#tab_java_1)


W
wizardforcel 已提交
838 839
---

W
wizardforcel 已提交
840
转换:**Project** DataStream→DataStream
W
wizardforcel 已提交
841 842 843

描述:从元组中选择字段的子集

W
wizardforcel 已提交
844 845 846 847 848 849 850

```
DataStream<Tuple3<Integer, Double, String>> in = // [...]
DataStream<Tuple2<String, Integer>> out = in.project(2,0);
```


W
wizardforcel 已提交
851

W
wizardforcel 已提交
852 853 854 855 856 857 858 859 860 861 862 863

# 物理分区

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)


Flink还通过以下函数对转换后的精确流分区进行低级控制(如果需要)。

*   [**Java**](#tab_java_2)
*   [**Scala**](#tab_scala_2)


W
wizardforcel 已提交
864 865
---

W
wizardforcel 已提交
866
转换:**自定义分区** DataStream→DataStream
W
wizardforcel 已提交
867 868 869

描述:使用用户定义的分区程序为每个数据元选择目标任务。

W
wizardforcel 已提交
870 871 872 873 874 875 876 877

```
dataStream.partitionCustom(partitioner, "someKey");
dataStream.partitionCustom(partitioner, 0); 
```



W
wizardforcel 已提交
878 879
---

W
wizardforcel 已提交
880
转换:**随机分区** DataStream→DataStream
W
wizardforcel 已提交
881 882 883

描述:根据均匀分布随机分配数据元。

W
wizardforcel 已提交
884 885 886 887 888 889 890

```
dataStream.shuffle(); 
```



W
wizardforcel 已提交
891 892
---

W
wizardforcel 已提交
893
转换:**Rebalance (循环分区)** DataStream→DataStream
W
wizardforcel 已提交
894 895 896

描述:分区数据元循环,每个分区创建相等的负载。在存在数据偏斜时用于性能优化。

W
wizardforcel 已提交
897 898 899 900 901 902 903

```
dataStream.rebalance(); 
```



W
wizardforcel 已提交
904 905
---

W
wizardforcel 已提交
906
转换:**重新调整** DataStream→DataStream
W
wizardforcel 已提交
907 908 909

描述:分区数据元,循环,到下游 算子操作的子集。如果您希望拥有管道,例如,从源的每个并行实例扇出到多个映射器的子集以分配负载但又不希望发生rebalance()会产生完全Rebalance ,那么这非常有用。这将仅需要本地数据传输而不是通过网络传输数据,具体取决于其他配置值,例如TaskManagers的插槽数。上游 算子操作发送数据元的下游 算子操作的子集取决于上游和下游 算子操作的并行度。例如,如果上游 算子操作具有并行性2并且下游 算子操作具有并行性6,则一个上游 算子操作将分配元件到三个下游 算子操作,而另一个上游 算子操作将分配到其他三个下游 算子操作。另一方面,如果下游 算子操作具有并行性2而上游 算子操作具有并行性6,则三个上游 算子操作将分配到一个下游 算子操作,而其他三个上游 算子操作将分配到另一个下游 算子操作。在不同并行度不是彼此的倍数的情况下,一个或多个下游 算子操作将具有来自上游 算子操作的不同数量的输入。请参阅此图以获取上例中连接模式的可视化:![数据流中的检查点障碍](../img/rescale.svg)

W
wizardforcel 已提交
910 911 912 913 914 915 916

```
dataStream.rescale(); 
```



W
wizardforcel 已提交
917 918
---

W
wizardforcel 已提交
919
转换:**广播** DataStream→DataStream
W
wizardforcel 已提交
920 921 922

描述:向每个分区广播数据元。

W
wizardforcel 已提交
923 924 925 926 927 928 929 930

```
dataStream.broadcast(); 
```




W
wizardforcel 已提交
931 932 933

---

W
wizardforcel 已提交
934
转换:**Custom partitioning** DataStream → DataStream
W
wizardforcel 已提交
935 936 937

描述:Uses a user-defined Partitioner to select the target task for each element.

W
wizardforcel 已提交
938 939 940 941 942 943 944 945

```
dataStream.partitionCustom(partitioner, "someKey")
dataStream.partitionCustom(partitioner, 0) 
```



W
wizardforcel 已提交
946 947
---

W
wizardforcel 已提交
948
转换:**Random partitioning** DataStream → DataStream
W
wizardforcel 已提交
949 950 951

描述:Partitions elements randomly according to a uniform distribution.

W
wizardforcel 已提交
952 953 954 955 956 957 958

```
dataStream.shuffle() 
```



W
wizardforcel 已提交
959 960
---

W
wizardforcel 已提交
961
转换:**Rebalancing (Round-robin partitioning)** DataStream → DataStream
W
wizardforcel 已提交
962 963 964

描述:Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew.

W
wizardforcel 已提交
965 966 967 968 969 970 971

```
dataStream.rebalance() 
```



W
wizardforcel 已提交
972 973
---

W
wizardforcel 已提交
974
转换:**Rescaling** DataStream → DataStream
W
wizardforcel 已提交
975 976 977

描述:Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations while the other upstream operation would distribute to the other two downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4 then two upstream operations would distribute to one downstream operation while the other two upstream operations would distribute to the other downstream operations.In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.</p> Please see this figure for a visualization of the connection pattern in the above example: </p>![Checkpoint barriers in data streams](../img/rescale.svg)

W
wizardforcel 已提交
978 979 980 981 982 983 984

```
dataStream.rescale() 
```



W
wizardforcel 已提交
985 986
---

W
wizardforcel 已提交
987
转换:**Broadcasting** DataStream → DataStream
W
wizardforcel 已提交
988 989 990

描述:Broadcasts elements to every partition.

W
wizardforcel 已提交
991 992 993 994 995 996

```
dataStream.broadcast() 
```


W
wizardforcel 已提交
997

W
wizardforcel 已提交
998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013

# 任务链和资源组

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)


链接两个后续转换意味着将它们共同定位在同一个线程中以获得更好的性能。如果可能的话,Flink默认链算子(例如,两个后续的映射转换)。如果需要,API可以对链接进行细粒度控制:

使用`StreamExecutionEnvironment.disableOperatorChaining()`如果要禁用整个工作链。对于更细粒度的控制,可以使用以下函数。请注意,这些函数只能在DataStream转换后立即使用,因为它们引用了前一个转换。例如,您可以使用`someStream.map(...).startNewChain()`,但不能使用`someStream.startNewChain()`

资源组是Flink中的一个插槽,请参阅 [插槽](https://flink.sojb.cn/ops/config.html#configuring-taskmanager-processing-slots)。如果需要,您可以在单独的插槽中手动隔离算子

*   [**Java**](#tab_java_3)
*   [**Scala**](#tab_scala_3)


W
wizardforcel 已提交
1014 1015
---

W
wizardforcel 已提交
1016
转换:开始新的链条
W
wizardforcel 已提交
1017 1018 1019

描述:从这个 算子开始,开始一个新的链。两个映射器将被链接,并且过滤器将不会链接到第一个映射器。

W
wizardforcel 已提交
1020 1021 1022 1023 1024 1025 1026

```
someStream.filter(...).map(...).startNewChain().map(...);
```



W
wizardforcel 已提交
1027 1028
---

W
wizardforcel 已提交
1029
转换:禁用链接
W
wizardforcel 已提交
1030 1031 1032

描述:不要链接Map 算子

W
wizardforcel 已提交
1033 1034 1035 1036 1037 1038 1039

```
someStream.map(...).disableChaining();
```



W
wizardforcel 已提交
1040 1041
---

W
wizardforcel 已提交
1042
转换:设置插槽共享组
W
wizardforcel 已提交
1043 1044 1045

描述:设置 算子操作的插槽共享组。Flink将把具有相同插槽共享组的 算子操作放入同一个插槽,同时保持其他插槽中没有插槽共享组的 算子操作。这可用于隔离插槽。如果所有输入 算子操作都在同一个插槽共享组中,则插槽共享组将继承输入 算子操作。默认插槽共享组的名称为“default”,可以通过调用slotSharingGroup(“default”)将 算子操作显式放入此组中。

W
wizardforcel 已提交
1046 1047 1048 1049 1050 1051 1052 1053

```
someStream.filter(...).slotSharingGroup("name");
```




W
wizardforcel 已提交
1054 1055 1056

---

W
wizardforcel 已提交
1057
转换:Start new chain
W
wizardforcel 已提交
1058 1059 1060

描述:Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper.

W
wizardforcel 已提交
1061 1062 1063 1064 1065 1066 1067

```
someStream.filter(...).map(...).startNewChain().map(...)
```



W
wizardforcel 已提交
1068 1069
---

W
wizardforcel 已提交
1070
转换:Disable chaining
W
wizardforcel 已提交
1071 1072 1073

描述:Do not chain the map operator

W
wizardforcel 已提交
1074 1075 1076 1077 1078 1079 1080

```
someStream.map(...).disableChaining()
```



W
wizardforcel 已提交
1081 1082
---

W
wizardforcel 已提交
1083
转换:Set slot sharing group
W
wizardforcel 已提交
1084 1085 1086

描述:Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default").

W
wizardforcel 已提交
1087 1088 1089 1090 1091 1092

```
someStream.filter(...).slotSharingGroup("name")
```


W
wizardforcel 已提交
1093

W
wizardforcel 已提交
1094