27.md 49.5 KB
Newer Older
W
wizardforcel 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274


# 视窗

> 译者:[flink.sojb.cn](https://flink.sojb.cn/)


Windows是处理无限流的核心。Windows将流拆分为有限大小的“桶”,我们可以在其上应用计算。本文档重点介绍如何在Flink中执行窗口,以及程序员如何从其提供的函数中获益最大化。

窗口Flink程序的一般结构如下所示。第一个片段指的是_被Keys化_流,而第二个片段指的_是非__被Keys化_流。正如人们所看到的,唯一的区别是`keyBy(...)`呼吁Keys流和`window(...)`成为`windowAll(...)`非被Key化的数据流。这也将作为页面其余部分的路线图。

**被Keys化Windows**

```
stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag" 
```

**非被Keys化Windows**

```
stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag" 
```

在上面,方括号([...])中的命令是可选的。这表明Flink允许您以多种不同方式自定义窗口逻辑,以便最适合您的需求。

## 窗口生命周期

简而言之,只要应该属于此窗口的第一个数据元到达,就会**创建**一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定 时,窗口将被**完全删除**`allowed lateness`(请参阅[允许的延迟)](#allowed-lateness))。Flink保证仅删除基于时间的窗口而不是其他类型,_例如_全局窗口(请参阅[窗口分配器](#window-assigners))。例如,使用基于事件时间的窗口策略,每5分钟创建一个非重叠(或翻滚)的窗口,并允许延迟1分钟,Flink将创建一个新窗口,用于间隔`12:00``12:05`当具有落入此间隔的时间戳的第一个数据元到达时,当水印通过`12:06` 时间戳时它将删除它。

此外,每个窗口将具有`Trigger`(参见[触发器](#triggers))和一个函数(`ProcessWindowFunction``ReduceFunction``AggregateFunction``FoldFunction`)(见[窗口函数](#window-functions))连接到它。该函数将包含要应用于窗口内容的计算,而`Trigger`指定窗口被认为准备好应用该函数的条件。触发策略可能类似于“当窗口中的数据元数量大于4”时,或“当水印通过窗口结束时”。触发器还可以决定在创建和删除之间的任何时间清除窗口的内容。在这种情况下,清除仅指窗口中的数据元,而_不是_窗口元数据。这意味着仍然可以将新数据添加到该窗口。

除了上述内容之外,您还可以指定一个`Evictor`(参见[Evictors](#evictors)),它可以在触发器触发后以及应用函数之前和/或之后从窗口中删除数据元。

在下文中,我们将详细介绍上述每个组件。在转到可选部分之前,我们从上面代码段中的必需部分开始(请参阅[被Keys化与非被Keys化窗口](#keyed-vs-non-keyed-windows)[窗口分配器](#window-assigner)[窗口函数](#window-function))。

## 被Keys化与非被Keys化Windows

要指定的第一件事是您的流是否应该键入。必须在定义窗口之前完成此 算子操作。使用the `keyBy(...)`将您的无限流分成逻辑被Key化的数据流。如果`keyBy(...)`未调用,则表示您的流不是被Keys化的。

对于被Key化的数据流,可以将传入事件的任何属性用作键([此处有](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys)更多详细信息)。拥有被Key化的数据流将允许您的窗口计算由多个任务并行执行,因为每个逻辑被Key化的数据流可以独立于其余任务进行处理。引用相同Keys的所有数据元将被发送到同一个并行任务。

在非被Key化的数据流的情况下,您的原始流将不会被拆分为多个逻辑流,并且所有窗口逻辑将由单个任务执行,_即_并行度为1。

## 窗口分配器

指定您的流是否已键入后,下一步是定义一个_窗口分配器_。窗口分配器定义如何将数据元分配给窗口。这是通过`WindowAssigner``window(...)`(对于_被Keys化_流)或`windowAll()`(对于_非被Keys化_流)调用中指定您的选择来完成的。

A `WindowAssigner`负责将每个传入数据元分配给一个或多个窗口。Flink带有预定义的窗口分配器,用于最常见的用例,即_翻滚窗口_, _滑动窗口_,_会话窗口_和_全局窗口_。您还可以通过扩展`WindowAssigner`类来实现自定义窗口分配器。所有内置窗口分配器(全局窗口除外)都根据时间为窗口分配数据元,这可以是处理时间或事件时间。请查看我们关于[活动时间](https://flink.sojb.cn/dev/event_time.html)的部分,了解处理时间和事件时间之间的差异以及时间戳和水印的生成方式。

基于时间的窗口具有_开始时间戳_(包括)和_结束时间戳_(不包括),它们一起描述窗口的大小。在代码中,Flink在使用`TimeWindow`基于时间的窗口时使用,该窗口具有查询开始和结束时间戳的方法`maxTimestamp()`,以及返回给定窗口的最大允许时间戳的附加方法。

在下文中,我们将展示Flink的预定义窗口分配器如何工作以及如何在DataStream程序中使用它们。下图显示了每个分配者的工作情况。紫色圆圈表示流的数据元,这些数据元由某个键(在这种情况下是_用户1_,_用户2_和_用户3_)划分。x轴显示时间的进度。

### 翻滚的Windows

一个_翻滚窗口_分配器的每个数据元分配给指定的窗口_的窗口大小_。翻滚窗具有固定的尺寸,不重叠。例如,如果指定大小为5分钟的翻滚窗口,则将评估当前窗口,并且每五分钟将启动一个新窗口,如下图所示。

![](img/tumbling-windows.svg)

以下代码段显示了如何使用翻滚窗口。

*   [**Java**](#tab_java_0)
*   [**Scala**](#tab_scala_0)



```
DataStream<T> input = ...;

// tumbling event-time windows
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// tumbling processing-time windows
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// daily tumbling event-time windows offset by -8 hours.
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);
```





```
val input: DataStream[T] = ...

// tumbling event-time windows input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// tumbling processing-time windows input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

// daily tumbling event-time windows offset by -8 hours. input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)
```



时间间隔可以通过使用一个指定`Time.milliseconds(x)``Time.seconds(x)``Time.minutes(x)`,等等。

如上一个示例所示,翻滚窗口分配器还采用可选`offset` 参数,可用于更改窗口的对齐方式。例如,如果没有偏移每小时翻滚窗口划时代对齐,这是你会得到如窗口 `1:00:00.000 - 1:59:59.999``2:00:00.000 - 2:59:59.999`等等。如果你想改变它,你可以给出一个偏移量。随着15分钟的偏移量,你会,例如,拿 `1:15:00.000 - 2:14:59.999``2:15:00.000 - 3:14:59.999`等一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定偏移量`Time.hours(-8)`

### 滑动窗口

该_滑动窗口_分配器分配元件以固定长度的窗口。与翻滚窗口分配器类似,_窗口大小_由_窗口大小_参数配置。附加的_窗口滑动_参数控制滑动窗口的启动频率。因此,如果幻灯片小于窗口大小,则滑动窗口可以重叠。在这种情况下,数据元被分配给多个窗口。

例如,您可以将大小为10分钟的窗口滑动5分钟。有了这个,你每隔5分钟就会得到一个窗口,其中包含过去10分钟内到达的事件,如下图所示。

![](img/sliding-windows.svg)

以下代码段显示了如何使用滑动窗口。

*   [**Java**](#tab_java_1)
*   [**Scala**](#tab_scala_1)



```
DataStream<T> input = ...;

// sliding event-time windows
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);
```





```
val input: DataStream[T] = ...

// sliding event-time windows input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// sliding processing-time windows offset by -8 hours input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)
```



时间间隔可以通过使用一个指定`Time.milliseconds(x)``Time.seconds(x)``Time.minutes(x)`,等等。

如上一个示例所示,滑动窗口分配器还采用可选`offset`参数,可用于更改窗口的对齐方式。例如,如果没有偏移每小时窗口半小时滑动与时代一致,那就是你会得到如窗口 `1:00:00.000 - 1:59:59.999``1:30:00.000 - 2:29:59.999`等等。如果你想改变它,你可以给出一个偏移量。随着15分钟的偏移量,你会,例如,拿 `1:15:00.000 - 2:14:59.999``1:45:00.000 - 2:44:59.999`等一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,您必须指定偏移量`Time.hours(-8)`

### 会话窗口

在_会话窗口_中按活动会话分配器组中的数据元。与_翻滚窗口_和_滑动窗口_相比,会话窗口不重叠并且没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到数据元时,_即_当发生不活动的间隙时,会关闭会话窗口。会话窗口分配器可以配置静态_会话间隙_或 _会话间隙提取器_函数,该函数定义不活动时间段的长度。当此期限到期时,当前会话将关闭,后续数据元将分配给新的会话窗口。

![](img/session-windows.svg)

以下代码段显示了如何使用会话窗口。

*   [**Java**](#tab_java_2)
*   [**Scala**](#tab_scala_2)



```
DataStream<T> input = ...;

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);

// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);

// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // determine and return session gap
    }))
    .<windowed transformation>(<window function>);
```





```
val input: DataStream[T] = ...

// event-time session windows with static gap input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// event-time session windows with dynamic gap input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

// processing-time session windows with static gap input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// processing-time session windows with dynamic gap input
    .keyBy(<key selector>)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)
```



静态间隙可以通过使用中的一个来指定`Time.milliseconds(x)``Time.seconds(x)``Time.minutes(x)`,等。

通过实现`SessionWindowTimeGapExtractor`接口指定动态间隙。

注意由于会话窗口没有固定的开始和结束,因此它们的评估方式与翻滚和滑动窗口不同。在内部,会话窗口算子为每个到达的记录创建一个新窗口,如果它们彼此之间的距离比定义的间隙更接近,则将窗口合并在一起。为了可合并的,会话窗口 算子操作者需要一个合并[触发器](#triggers)和一个合并 [的窗函数](#window-functions),如`ReduceFunction``AggregateFunction`,或`ProcessWindowFunction``FoldFunction`不能合并。)

### 全局Windows

一个_全局性的窗口_分配器分配使用相同的Keys相同的单个的所有数据元_全局窗口_。此窗口方案仅在您还指定自定义[触发器](#triggers)时才有用。否则,将不执行任何计算,因为全局窗口没有我们可以处理聚合数据元的自然结束。

![](img/non-windowed.svg)

以下代码段显示了如何使用全局窗口。

*   [**Java**](#tab_java_3)
*   [**Scala**](#tab_scala_3)



```
DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>);
```





```
val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>)
```



## 窗口函数

定义窗口分配器后,我们需要指定要在每个窗口上执行的计算。这是_窗口函数_的职责,_窗口函数_用于在系统确定窗口准备好进行处理后处理每个(可能是被Keys化的)窗口的数据元(请参阅Flink如何确定窗口何时准备好的[触发器](#triggers))。

的窗函数可以是一个`ReduceFunction``AggregateFunction``FoldFunction``ProcessWindowFunction`。前两个可以更有效地执行(参见[State Size](#state size)部分),因为Flink可以在每个窗口到达时递增地聚合它们的数据元。A `ProcessWindowFunction`获取`Iterable`窗口中包含的所有数据元以及有关数据元所属窗口的其他元信息。

具有a的窗口转换`ProcessWindowFunction`不能像其他情况一样有效地执行,因为Flink必须在调用函数之前在内部缓冲窗口的_所有_数据元。这可以通过组合来减轻`ProcessWindowFunction``ReduceFunction``AggregateFunction``FoldFunction`以获得两个窗口元件的增量聚合并且该附加元数据窗口 `ProcessWindowFunction`接收。我们将查看每个变体的示例。

### ReduceFunction

A `ReduceFunction`指定如何组合输入中的两个数据元以生成相同类型的输出数据元。Flink使用a `ReduceFunction`来递增地聚合窗口的数据元。

A `ReduceFunction`可以像这样定义和使用:

*   [**Java**](#tab_java_4)
*   [**Scala**](#tab_scala_4)



```
DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>> {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });
```





```
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
```



上面的示例总结了窗口中所有数据元的元组的第二个字段。

### 聚合函数

An `AggregateFunction`是一个通用版本,`ReduceFunction`它有三种类型:输入类型(`IN`),累加器类型(`ACC`)和输出类型(`OUT`)。输入类型是输入流中数据元的类型,并且`AggregateFunction`具有将一个输入数据元添加到累加器的方法。该接口还具有用于创建初始累加器的方法,用于将两个累加器合并到一个累加器中以及用于`OUT`从累加器提取输出(类型)。我们将在下面的示例中看到它的工作原理。

与之相同`ReduceFunction`,Flink将在窗口到达时递增地聚合窗口的输入数据元。

一个`AggregateFunction`可以被定义并这样使用:

*   [**Java**](#tab_java_5)
*   [**Scala**](#tab_scala_5)



```
/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());
```





```
/**
 * The accumulator is used to keep a running sum and a count. The [getResult] method
 * computes the average.
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}

val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate)
```



上面的示例计算窗口中数据元的第二个字段的平均值。

### FoldFunction

A `FoldFunction`指定窗口的输入数据元如何与输出类型的数据元组合。所述`FoldFunction`递增称为该被添加到窗口和电流输出值的每个数据元。第一个数据元与输出类型的预定义初始值组合。

A `FoldFunction`可以像这样定义和使用:

*   [**Java**](#tab_java_6)
*   [**Scala**](#tab_scala_6)



```
DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .fold("", new FoldFunction<Tuple2<String, Long>, String>> {
       public String fold(String acc, Tuple2<String, Long> value) {
         return acc + value.f1;
       }
    });
```





```
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .fold("") { (acc, v) => acc + v._2 }
```



上面的示例将所有输入`Long`值附加到最初为空`String`

注意 `fold()`不能与会话窗口或其他可合并窗口一起使用。

### ProcessWindowFunction

ProcessWindowFunction获取包含窗口的所有数据元的Iterable,以及可访问时间和状态信息的Context对象,这使其能够提供比其他窗口函数更多的灵活性。这是以性能和资源消耗为代价的,因为数据元不能以递增方式聚合,而是需要在内部进行缓冲,直到窗口被认为已准备好进行处理。

`ProcessWindowFunction`外观签名如下:

*   [**Java**](#tab_java_7)
*   [**Scala**](#tab_scala_7)



```
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param context The context in which the window is being evaluated.
     * @param elements The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     *
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public abstract void process(
            KEY key,
            Context context,
            Iterable<IN> elements,
            Collector<OUT> out) throws Exception;

   	/**
   	 * The context holding window metadata.
   	 */
   	public abstract class Context implements java.io.Serializable {
   	    /**
   	     * Returns the window that is being evaluated.
   	     */
   	    public abstract W window();

   	    /** Returns the current processing time. */
   	    public abstract long currentProcessingTime();

   	    /** Returns the current event-time watermark. */
   	    public abstract long currentWatermark();

   	    /**
   	     * State accessor for per-key and per-window state.
   	     *
   	     * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
   	     * by implementing {@link ProcessWindowFunction#clear(Context)}.
   	     */
   	    public abstract KeyedStateStore windowState();

   	    /**
   	     * State accessor for per-key global state.
   	     */
   	    public abstract KeyedStateStore globalState();
   	}

}
```





```
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {

  /**
    * Evaluates the window and outputs none or several elements.
    *
    * @param key      The key for which this window is evaluated.
    * @param context  The context in which the window is being evaluated.
    * @param elements The elements in the window being evaluated.
    * @param out      A collector for emitting elements.
    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  def process(
      key: KEY,
      context: Context,
      elements: Iterable[IN],
      out: Collector[OUT])

  /**
    * The context holding window metadata
    */
  abstract class Context {
    /**
      * Returns the window that is being evaluated.
      */
    def window: W

    /**
      * Returns the current processing time.
      */
    def currentProcessingTime: Long

    /**
      * Returns the current event-time watermark.
      */
    def currentWatermark: Long

    /**
      * State accessor for per-key and per-window state.
      */
    def windowState: KeyedStateStore

    /**
      * State accessor for per-key global state.
      */
    def globalState: KeyedStateStore
  }

}
```



注意该`key`参数是通过`KeySelector``keyBy()`调用指定的Keys提取的Keys。在元组索引键或字符串字段引用的情况下,此键类型始终是`Tuple`,您必须手动将其转换为正确大小的元组以提取键字段。

A `ProcessWindowFunction`可以像这样定义和使用:

*   [**Java**](#tab_java_8)
*   [**Scala**](#tab_scala_8)



```
DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(t -> t.f0)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction());

/* ... */

public class MyProcessWindowFunction
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}
```





```
val input: DataStream[(String, Long)] = ...

input
  .keyBy(_._1)
  .timeWindow(Time.minutes(5))
  .process(new MyProcessWindowFunction())

/* ... */

class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {

  def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window ${context.window} count: $count")
  }
}
```



该示例显示了`ProcessWindowFunction`对窗口中的数据元进行计数的情况。此外,窗口函数将有关窗口的信息添加到输出。

注意注意,使用`ProcessWindowFunction`简单的聚合(例如count)是非常低效的。下一节将介绍a `ReduceFunction`或如何`AggregateFunction`与a结合使用`ProcessWindowFunction`以获得增量聚合和a的附加信息`ProcessWindowFunction`

### ProcessWindowFunction with Incremental Aggregation

当a 数据元到达窗口时,A `ProcessWindowFunction`可以与a `ReduceFunction`,an `AggregateFunction`或a组合`FoldFunction`以递增地聚合数据元。当窗口关闭时,`ProcessWindowFunction`将提供聚合结果。这允许它在访问附加窗口元信息的同时递增地计算窗口`ProcessWindowFunction`

注意您也可以使用旧版`WindowFunction`而不是 `ProcessWindowFunction`增量窗口聚合。

#### 使用ReduceFunction增量窗口聚合

以下示例显示如何将增量`ReduceFunction`与a组合`ProcessWindowFunction`以返回窗口中的最小事件以及窗口的开始时间。

*   [**Java**](#tab_java_9)
*   [**Scala**](#tab_scala_9)



```
DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

// Function definitions

private static class MyReduceFunction implements ReduceFunction<SensorReading> {

  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      return r1.value() > r2.value() ? r2 : r1;
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<SensorReading> minReadings,
                    Collector<Tuple2<Long, SensorReading>> out) {
      SensorReading min = minReadings.iterator().next();
      out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
  }
}
```





```
val input: DataStream[SensorReading] = ...

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .reduce(
    (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
    ( key: String,
      window: TimeWindow,
      minReadings: Iterable[SensorReading],
      out: Collector[(Long, SensorReading)] ) =>
      {
        val min = minReadings.iterator.next()
        out.collect((window.getStart, min))
      }
  )
```



#### 使用AggregateFunction进行增量窗口聚合

以下示例显示如何将增量`AggregateFunction`与a组合`ProcessWindowFunction`以计算平均值,并同时发出键和窗口以及平均值。

*   [**Java**](#tab_java_10)
*   [**Scala**](#tab_scala_10)



```
DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction());

// Function definitions

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<Double> averages,
                    Collector<Tuple2<String, Double>> out) {
      Double average = averages.iterator().next();
      out.collect(new Tuple2<>(key, average));
  }
}
```





```
val input: DataStream[(String, Long)] = ...

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction())

// Function definitions 
/**
 * The accumulator is used to keep a running sum and a count. The [getResult] method
 * computes the average.
 */
class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
  override def createAccumulator() = (0L, 0L)

  override def add(value: (String, Long), accumulator: (Long, Long)) =
    (accumulator._1 + value._2, accumulator._2 + 1L)

  override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2

  override def merge(a: (Long, Long), b: (Long, Long)) =
    (a._1 + b._1, a._2 + b._2)
}

class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {

  def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double]): () = {
    val average = averages.iterator.next()
    out.collect((key, average))
  }
}
```



#### 使用FoldFunction进行增量窗口聚合

以下示例显示如何将增量`FoldFunction`与a组合`ProcessWindowFunction`以提取窗口中的事件数,并返回窗口的键和结束时间。

*   [**Java**](#tab_java_11)
*   [**Scala**](#tab_scala_11)



```
DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .timeWindow(<duration>)
  .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())

// Function definitions

private static class MyFoldFunction
    implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {

  public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
      Integer cur = acc.getField(2);
      acc.setField(cur + 1, 2);
      return acc;
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<Tuple3<String, Long, Integer>> counts,
                    Collector<Tuple3<String, Long, Integer>> out) {
    Integer count = counts.iterator().next().getField(2);
    out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
  }
}
```





```
val input: DataStream[SensorReading] = ...

input
 .keyBy(<key selector>)
 .timeWindow(<duration>)
 .fold (
    ("", 0L, 0),
    (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
    ( key: String,
      window: TimeWindow,
      counts: Iterable[(String, Long, Int)],
      out: Collector[(String, Long, Int)] ) =>
      {
        val count = counts.iterator.next()
        out.collect((key, window.getEnd, count._3))
      }
  )
```



### 在ProcessWindowFunction中使用每窗口状态

除了访问被Keys化状态(如任何丰富的函数可以),a `ProcessWindowFunction`还可以使用被Keys化状态,该被Keys化状态的作用域是函数当前正在处理的窗口。在这种情况下,了解_每个窗口_状态所指的窗口是很重要的。涉及不同的“窗口”:

*   指定窗口 算子操作时定义的窗口:这可能是_1小时的翻滚窗口_或_滑动__1小时__的2小时滑动窗口_。
*   给定键的已定义窗口的实际实例:_对于user-id xyz,_这可能是_从12:00到13:00的时间窗口_。这基于窗口定义,并且将基于作业当前正在处理的键的数量以及基于事件落入的时隙而存在许多窗口。

每窗口状态与后两者相关联。这意味着如果我们处理1000个不同键的事件,并且所有这些事件的事件当前都落入_[12:00,13:00]_时间窗口,那么将有1000个窗口实例,每个窗口实例都有自己的按键每窗口状态。

调用接收的`Context`对象有两种方法`process()`允许访问两种类型的状态:

*   `globalState()`,允许访问没有作用于窗口的被Keys化状态
*   `windowState()`,允许访问也限定在窗口范围内的被Keys化状态

如果您预计同一窗口会多次触发,则此函数非常有用,如果您对迟到的数据进行后期触发或者您有自定义触发器进行推测性早期触发时可能会发生这种情况。在这种情况下,您将存储有关先前点火的信息或每个窗口状态的点火次数。

使用窗口状态时,清除窗口时清除该状态也很重要。这应该在`clear()`方法中发生。

### WindowFunction(留存)

在一些`ProcessWindowFunction`可以使用的地方你也可以使用`WindowFunction`。这是较旧版本`ProcessWindowFunction`,提供较少的上下文信息,并且没有一些高级函数,例如每窗口被Keys化状态。此接口将在某个时候弃用。

`WindowFunction`外观的签名如下:

*   [**Java**](#tab_java_12)
*   [**Scala**](#tab_scala_12)



```
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

  /**
   * Evaluates the window and outputs none or several elements.
   *
   * @param key The key for which this window is evaluated.
   * @param window The window that is being evaluated.
   * @param input The elements in the window being evaluated.
   * @param out A collector for emitting elements.
   *
   * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
   */
  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
```





```
trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {

  /**
    * Evaluates the window and outputs none or several elements.
    *
    * @param key    The key for which this window is evaluated.
    * @param window The window that is being evaluated.
    * @param input  The elements in the window being evaluated.
    * @param out    A collector for emitting elements.
    * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
}
```



它可以像这样使用:

*   [**Java**](#tab_java_13)
*   [**Scala**](#tab_scala_13)



```
DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());
```





```
val input: DataStream[(String, Long)] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction())
```



## 触发器

A `Trigger`确定何时_窗口函数_准备好处理窗口(由_窗口分配器_形成)。每个都有默认值。如果默认触发器不符合您的需要,您可以使用指定自定义触发器。`WindowAssigner``Trigger``trigger(...)`

触发器界面有五种方法可以`Trigger`对不同的事件做出反应:

*   `onElement()`为添加到窗口的每个数据元调用该方法。
*   `onEventTime()`在注册的事件时间计时器触发时调用该方法。
*   `onProcessingTime()`在注册的处理时间计时器触发时调用该方法。
*`onMerge()`方法与状态触发器相关,并且当它们的相应窗口合并时合并两个触发器的状态,_例如_当使用会话窗口时。
*   最后,该`clear()`方法在移除相应窗口时执行所需的任何动作。

关于上述方法需要注意两点:

1)前三个决定如何通过返回a来对其调用事件进行 算子操作`TriggerResult`。该 算子操作可以是以下之一:

*   `CONTINUE`: 没做什么,
*   `FIRE`:触发​​计算,
*   `PURGE`:清除窗口中的数据元,和
*   `FIRE_AND_PURGE`:触发​​计算并清除窗口中的数据元。

2)这些方法中的任何一种都可用于注册处理或事件时间计时器以用于将来的 算子操作。

### 火与清除

一旦触发器确定窗口已准备好进行处理,它就会触发,_即_它返回`FIRE``FIRE_AND_PURGE`。这是窗口算子发出当前窗口结果的信号。给定一个窗口,将`ProcessWindowFunction` 所有数据元传递给`ProcessWindowFunction`(可能在将它们传递给逐出器后)。窗口`ReduceFunction``AggregateFunction``FoldFunction`简单地发出他们急切地汇总结果。

当触发器触发时,它可以`FIRE`或者`FIRE_AND_PURGE`。虽然`FIRE`保持了窗口的内容,`FIRE_AND_PURGE`删除其内容。默认情况下,预先实现的触发器只是`FIRE`没有清除窗口状态。

注意清除将简单地删除窗口的内容,并将保存有关窗口和任何触发状态的任何潜在元信息。

### WindowAssigners的默认触发器

默认`Trigger``WindowAssigner`是适用于许多使用情况。例如,所有事件时间窗口分配器都具有`EventTimeTrigger`默认触发器。一旦水印通过窗口的末端,该触发器就会触发。

注意默认触发器`GlobalWindow``NeverTrigger`从不触发的。因此,在使用时必须定义自定义触发器`GlobalWindow`

注意通过使用`trigger()`您指定触发器会覆盖a的默认触发器`WindowAssigner`。例如,如果指定a `CountTrigger``TumblingEventTimeWindows`则不再根据时间进度获取窗口,而是仅按计数。现在,如果你想根据时间和数量做出反应,你必须编写自己的自定义触发器。

### 内置和自定义触发器

Flink附带了一些内置触发器。

*   (已经提到的)`EventTimeTrigger`基于水印测量的事件时间的进展而触发。
*`ProcessingTimeTrigger`基于处理时间的火灾。
*   `CountTrigger`一旦窗口中的数据元数量超过给定限制,就会触发。
*`PurgingTrigger`另一个触发器作为参数作为参数并将其转换为清除触发器。

如果需要实现自定义触发器,则应该检查抽象 [Trigger](https://github.com/apache/flink/blob/master//flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java)类。请注意,API仍在不断发展,可能会在Flink的未来版本中发生变化。

## 逐出器

Flink的窗口模型允许指定`Evictor`除了`WindowAssigner`和之外的可选项`Trigger`。这可以使用`evictor(...)`方法(在本文档的开头显示)来完成。所述逐出器必须从一个窗口中删除数据元的能力_之后_触发器触发和_之前和/或之后_被施加的窗口函数。为此,该`Evictor`接口有两种方法:

```
/**
 * Optionally evicts elements. Called before windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

/**
 * Optionally evicts elements. Called after windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext); 
```

`evictBefore()`包含窗口函数之前被施加驱逐逻辑,而`evictAfter()` 包含窗口函数之后要施加的一个。在应用窗口函数之前被逐出的数据元将不会被处理。

Flink带有三个预先实施的驱逐者。这些是:

*   `CountEvictor`:保持窗口中用户指定数量的数据元,并从窗口缓冲区的开头丢弃剩余的数据元。
*   `DeltaEvictor`:取a `DeltaFunction`和a `threshold`,计算窗口缓冲区中最后一个数据元与其余每个数据元之间的差值,并删除delta大于或等于阈值的值。
*   `TimeEvictor`:以`interval`毫秒为单位作为参数,对于给定窗口,它查找`max_ts`其数据元中的最大时间戳,并删除时间戳小于的所有数据元`max_ts - interval`

默认默认情况下,所有预先实现的驱逐程序在窗口函数之前应用它们的逻辑。

注意指定逐出器会阻止任何预聚合,因为在应用计算之前,必须将窗口的所有数据元传递给逐出器。

注意 Flink不保证窗口中数据元的顺序。这意味着尽管逐出器可以从窗口的开头移除数据元,但这些数据元不一定是首先或最后到达的数据元。

## 允许迟到

当使用_事件时间_窗口时,可能会发生数据元迟到的情况,_即_ Flink用于跟踪事件时间进度的水印已经超过数据元所属的窗口的结束时间戳。查看 [事件时间](https://flink.sojb.cn/dev/event_time.html),特别是[后期数据元,](https://flink.sojb.cn/dev/event_time.html#late-elements)以便更全面地讨论Flink如何处理事件时间。

默认情况下,当水印超过窗口末尾时,会删除延迟数据元。但是,Flink允许为窗口 算子指定最大_允许延迟_。允许延迟指定数据元在被删除之前可以延迟多少时间,并且其默认值为0.在水印通过窗口结束之后但在通过窗口结束加上允许的延迟之前到达的数据元,仍然添加到窗口中。根据使用的触发器,延迟但未丢弃的数据元可能会导致窗口再次触发。就是这种情况`EventTimeTrigger`

为了使这项工作,Flink保持窗口的状态,直到他们允许的延迟到期。一旦发生这种情况,Flink将删除窗口并删除其状态,如“ [窗口生命周期”](#window-lifecycle)部分中所述。

默认默认情况下,允许的延迟设置为 `0`。也就是说,到达水印后面的数据元将被丢弃。

你可以像这样指定一个允许的迟到:

*   [**Java**](#tab_java_14)
*   [**Scala**](#tab_scala_14)



```
DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>);
```





```
val input: DataStream[T] = ...

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>)
```



注意使用`GlobalWindows`窗口分配器时,由于全局窗口的结束时间戳为,因此没有数据被认为是迟到的`Long.MAX_VALUE`

### 将后期数据作为副输出

使用Flink的[旁路输出](https://flink.sojb.cn/dev/stream/side_output.html)函数,您可以获得最近丢弃的数据流。

首先需要指定您希望`sideOutputLateData(OutputTag)`在窗口流上使用延迟数据。然后,您可以在窗口 算子操作的结果上获取旁路输出流:

*   [**Java**](#tab_java_15)
*   [**Scala**](#tab_scala_15)



```
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

DataStream<T> input = ...;

SingleOutputStreamOperator<T> result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>);

DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
```





```
val lateOutputTag = OutputTag[T]("late-data")

val input: DataStream[T] = ...

val result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>)

val lateStream = result.getSideOutput(lateOutputTag)
```



### 迟到数据元考虑因素

当指定允许的延迟大于0时,在水印通过窗口结束后保持窗口及其内容。在这些情况下,当迟到但未掉落的数据元到达时,它可能触发窗口的另一次触发。这些射击被称为`late firings`,因为它们是由迟到事件触发的,与之相反的`main firing` 是窗口的第一次射击。在会话窗口的情况下,后期点火可以进一步导致窗口的合并,因为它们可以“桥接”两个预先存在的未合并窗口之间的间隙。

注意您应该知道,后期触发发出的数据元应该被视为先前计算的更新结果,即,您的数据流将包含同一计算的多个结果。根据您的应用程序,您需要考虑这些重复的结果或对其进行重复数据删除。

## 使用窗口结果

窗口 算子操作的结果也是a `DataStream`,没有关于窗口 算子操作的信息保存在结果数据元中,所以如果你想保存关于窗口的元信息,你必须在你的结果数据元中手动编码该信息 `ProcessWindowFunction`。在结果数据元上设置的唯一相关信息是数据元_时间戳_。这被设置为已处理窗口的最大允许时间戳,即_结束时间戳-1_,因为窗口结束时间戳是独占的。请注意,事件时间窗口和处理时间窗口都是如此。即,在窗口化 算子操作数据元之后始终具有时间戳,但这可以是事件时间时间戳或处理时间时间戳。对于处理时间窗口,这没有特别的含义,但对于事件时间窗口,这与水印与窗口交互的方式一起启用 具有相同窗口大小的[连续窗口 算子操作](#consecutive-windowed-operations)。在看了水印如何与窗口交互后,我们将介绍这一点。

### 水印和窗口的互动

在继续本节之前,您可能需要查看有关 [事件时间和水印的部分](https://flink.sojb.cn/dev/event_time.html)

当水印到达窗口 算子时,会触发两件事:

*   水印触发所有窗口的计算,其中最大时间戳(即 _结束时间戳-1_)小于新水印
*   水印被转发(按原样)到下游 算子操作

直观地,水印“冲出”任何窗口,一旦接收到该水印,将在下游 算子操作中被认为是迟到。

### 连续窗口 算子操作

如前所述,计算窗口结果的时间戳的方式以及水印与窗口交互的方式允许将连续的窗口 算子操作串联在一起。当您想要执行两个连续的窗口 算子操作时,这可能很有用,您希望使用不同的键,但仍希望来自同一上游窗口的数据元最终位于同一下游窗口中。考虑这个例子:

*   [**Java**](#tab_java_16)
*   [**Scala**](#tab_scala_16)



```
DataStream<Integer> input = ...;

DataStream<Integer> resultsPerKey = input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new Summer());

DataStream<Integer> globalResults = resultsPerKey
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new TopKWindowFunction());
```





```
val input: DataStream[Int] = ...

val resultsPerKey = input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new Summer())

val globalResults = resultsPerKey
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new TopKWindowFunction())
```



在该示例中,`[0, 5)`来自第一 算子操作的时间窗口的结果也将`[0, 5)`在随后的窗口化 算子操作中的时间窗口中结束。这允许计算每个键的Sum然后在第二个 算子操作中计算同一窗口内的前k个数据元。

## 有用的状态规模考虑因素

Windows可以在很长一段时间内(例如几天,几周或几个月)定义,因此可以累积非常大的状态。在估算窗口计算的存储要求时,需要记住几条规则:

1.  Flink为每个窗口创建一个每个数据元的副本。鉴于此,翻滚窗口保存每个数据元的一个副本(一个数据元恰好属于一个窗口,除非它被延迟)。相反,滑动窗口会创建每个数据元的几个,如“ [窗口分配器”](#window-assigners)部分中所述。因此,尺寸为1天且滑动1秒的滑动窗口可能不是一个好主意。

2.  `ReduceFunction``AggregateFunction`并且`FoldFunction`可以显着降低存储要求,因为它们急切地聚合数据元并且每个窗口只存储一个值。相反,仅使用a `ProcessWindowFunction`需要累积所有数据元。

3.  使用an `Evictor`可以防止任何预聚合,因为在应用计算之前,窗口的所有数据元都必须通过逐出器传递(参见[Evictors](#evictors))。