diff --git a/docs/1.7-SNAPSHOT/13.md b/docs/1.7-SNAPSHOT/13.md index 03f867c132e6805d3c7ad5cfdd35742029b54f9c..51ea9712e9370f85c37df16513c7d05485dc2f2d 100644 --- a/docs/1.7-SNAPSHOT/13.md +++ b/docs/1.7-SNAPSHOT/13.md @@ -50,11 +50,13 @@ val data: DataSet[(Int, String, Double)] = // [...] data.map { #### DataSet API -| 方法 | 原生 | DEMO | -| --- | --- | --- | -| **mapWith** | **map(DataSet)** | +--- -<figure class="highlight"> +方法:`mapWith` + +原生:`map(DataSet)` + +DEMO: ``` data.mapWith { @@ -62,12 +64,15 @@ data.mapWith { } ``` -</figure> - | -| **mapPartitionWith** | **mapPartition(DataSet)** | -<figure class="highlight"> +--- + +方法:`mapPartitionWith` + +原生:`mapPartition(DataSet)` + +DEMO: ``` data.mapPartitionWith { @@ -75,12 +80,15 @@ data.mapPartitionWith { } ``` -</figure> - | -| **flatMapWith** | **flatMap(DataSet)** | -<figure class="highlight"> +--- + +方法:`flatMapWith` + +原生:`flatMap(DataSet)` + +DEMO: ``` data.flatMapWith { @@ -88,12 +96,15 @@ data.flatMapWith { } ``` -</figure> - | -| **filterWith** | **filter(DataSet)** | -<figure class="highlight"> +--- + +方法:`filterWith` + +原生:`filter(DataSet)` + +DEMO: ``` data.filterWith { @@ -101,12 +112,15 @@ data.filterWith { } ``` -</figure> - | -| **reduceWith** | **reduce(DataSet,GroupedDataSet)** | -<figure class="highlight"> +--- + +方法:`reduceWith` + +原生:`reduce(DataSet,GroupedDataSet)` + +DEMO: ``` data.reduceWith { @@ -114,12 +128,15 @@ data.reduceWith { } ``` -</figure> - | -| **reduceGroupWith** | **reduceGroup(GroupedDataSet)** | -<figure class="highlight"> +--- + +方法:`reduceGroupWith` + +原生:`reduceGroup(GroupedDataSet)` + +DEMO: ``` data.reduceGroupWith { @@ -127,12 +144,15 @@ data.reduceGroupWith { } ``` -</figure> - | -| **groupingBy** | **groupBy(DataSet)** | -<figure class="highlight"> +--- + +方法:`groupingBy` + +原生:`groupBy(DataSet)` + +DEMO: ``` data.groupingBy { @@ -140,12 +160,15 @@ data.groupingBy { } ``` -</figure> - | -| **sortGroupWith** | **sortGroup(GroupedDataSet)** | -<figure class="highlight"> +--- + +方法:`sortGroupWith` + +原生:`sortGroup(GroupedDataSet)` + +DEMO: ``` grouped.sortGroupWith(Order.ASCENDING) { @@ -153,12 +176,15 @@ grouped.sortGroupWith(Order.ASCENDING) { } ``` -</figure> - | -| **combineGroupWith** | **combineGroup(GroupedDataSet)** | -<figure class="highlight"> +--- + +方法:`combineGroupWith` + +原生:`combineGroup(GroupedDataSet)` + +DEMO: ``` grouped.combineGroupWith { @@ -166,12 +192,15 @@ grouped.combineGroupWith { } ``` -</figure> - | -| **projecting** | **apply(JoinDataSet,CrossDataSet)** | -<figure class="highlight"> +--- + +方法:`projecting` + +原生:`apply(JoinDataSet,CrossDataSet)` + +DEMO: ``` data1.join(data2). @@ -186,12 +215,15 @@ data1.cross(data2).projecting { } ``` -</figure> - | -| **projecting** | **apply(CoGroupDataSet)** | -<figure class="highlight"> +--- + +方法:`projecting` + +原生:`apply(CoGroupDataSet)` + +DEMO: ``` data1.coGroup(data2). @@ -203,17 +235,18 @@ data1.coGroup(data2). } ``` -</figure> - | + #### DataStream API -| 方法 | 原生 | DEMO | -| --- | --- | --- | -| **mapWith** | **map(DataStream)** | +--- + +方法:`mapWith` + +原生:`map(DataStream)` -<figure class="highlight"> +DEMO: ``` data.mapWith { @@ -221,12 +254,15 @@ data.mapWith { } ``` -</figure> - | -| **mapPartitionWith** | **mapPartition(DataStream)** | -<figure class="highlight"> +--- + +方法:`mapPartitionWith` + +原生:`mapPartition(DataStream)` + +DEMO: ``` data.mapPartitionWith { @@ -234,12 +270,15 @@ data.mapPartitionWith { } ``` -</figure> - | -| **flatMapWith** | **flatMap(DataStream)** | -<figure class="highlight"> +--- + +方法:`flatMapWith` + +原生:`flatMap(DataStream)` + +DEMO: ``` data.flatMapWith { @@ -247,12 +286,15 @@ data.flatMapWith { } ``` -</figure> - | -| **filterWith** | **filter(DataStream)** | -<figure class="highlight"> +--- + +方法:`filterWith` + +原生:`filter(DataStream)` + +DEMO: ``` data.filterWith { @@ -260,12 +302,15 @@ data.filterWith { } ``` -</figure> - | -| **keyingBy** | **keyBy(DataStream)** | -<figure class="highlight"> +--- + +方法:`keyingBy` + +原生:`keyBy(DataStream)` + +DEMO: ``` data.keyingBy { @@ -273,12 +318,15 @@ data.keyingBy { } ``` -</figure> - | -| **mapWith** | **map(ConnectedDataStream)** | -<figure class="highlight"> +--- + +方法:`mapWith` + +原生:`map(ConnectedDataStream)` + +DEMO: ``` data.mapWith( @@ -287,12 +335,15 @@ data.mapWith( ) ``` -</figure> - | -| **flatMapWith** | **flatMap(ConnectedDataStream)** | -<figure class="highlight"> +--- + +方法:`flatMapWith` + +原生:`flatMap(ConnectedDataStream)` + +DEMO: ``` data.flatMapWith( @@ -301,12 +352,15 @@ data.flatMapWith( ) ``` -</figure> - | -| **keyingBy** | **keyBy(ConnectedDataStream)** | -<figure class="highlight"> +--- + +方法:`keyingBy` + +原生:`keyBy(ConnectedDataStream)` + +DEMO: ``` data.keyingBy( @@ -315,12 +369,15 @@ data.keyingBy( ) ``` -</figure> - | -| **reduceWith** | **reduce(KeyedStream,WindowedStream)** | -<figure class="highlight"> +--- + +方法:`reduceWith` + +原生:`reduce(KeyedStream,WindowedStream)` + +DEMO: ``` data.reduceWith { @@ -328,12 +385,15 @@ data.reduceWith { } ``` -</figure> - | -| **foldWith** | **fold(KeyedStream,WindowedStream)** | -<figure class="highlight"> +--- + +方法:`foldWith` + +原生:`fold(KeyedStream,WindowedStream)` + +DEMO: ``` data.foldWith(User(bought = 0)) { @@ -341,12 +401,15 @@ data.foldWith(User(bought = 0)) { } ``` -</figure> - | -| **applyWith** | **apply(WindowedStream)** | -<figure class="highlight"> +--- + +方法:`applyWith` + +原生:`apply(WindowedStream)` + +DEMO: ``` data.applyWith(0)( @@ -354,12 +417,15 @@ data.applyWith(0)( windowFunction = case (k, w, sum) => // [...] ) ``` -</figure> - | -| **projecting** | **apply(JoinedStream)** | -<figure class="highlight"> +--- + +方法:`projecting` + +原生:`apply(JoinedStream)` + +DEMO: ``` data1.join(data2). @@ -370,9 +436,8 @@ data1.join(data2). } ``` -</figure> - | + 有关每种方法的语义的更多信息,请参阅 [DataSet](https://flink.sojb.cn/dev/batch/index.html)和[DataStream](https://flink.sojb.cn/dev/datastream_api.html) API文档。 diff --git a/docs/1.7-SNAPSHOT/22.md b/docs/1.7-SNAPSHOT/22.md index 5800b57fe879f1d04c16512e699831306e27dfa3..f2506136d491866ea823159edb651c539ca32408 100644 --- a/docs/1.7-SNAPSHOT/22.md +++ b/docs/1.7-SNAPSHOT/22.md @@ -98,53 +98,78 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment() 可以通过设置更多参数和/或默认值`conf/flink-conf.yaml`(参见完整指南的[配置](https://flink.sojb.cn/ops/config.html)): -| 键 | 默认 | 描述 | -| --- | --- | --- | -| -##### state.backend +--- - | (none) | 状态后台用于存储和检查点状态。 | -| +键:`state.backend` -##### state.backend.async +默认值:`(none)` - | true | 选择状态后台是否应在可能和可配置的情况下使用异步SNAPSHOT方法。某些状态后台可能不支持异步SNAPSHOT,或者仅支持异步SNAPSHOT,并忽略此选项。 | -| +描述:状态后台用于存储和检查点状态。 -##### state.backend.fs.memory-threshold +--- - | 1024 | 状态数据文件的最小大小。小于该值的所有状态块都内联存储在根检查点元数据文件中。 | -| +键:`state.backend.async` -##### state.backend.incremental +默认值:`true` - | false | 如果可能,选择状态后台是否应创建增量检查点。对于增量检查点,仅存储来自先前检查点的差异,而不是完整的检查点状态。某些状态后台可能不支持增量检查点并忽略此选项。 | -| +描述:选择状态后台是否应在可能和可配置的情况下使用异步SNAPSHOT方法。某些状态后台可能不支持异步SNAPSHOT,或者仅支持异步SNAPSHOT,并忽略此选项。 -##### state.backend.local-recovery +--- - | false | | -| +键:`state.backend.fs.memory-threshold` -##### state.checkpoints.dir +默认值:`1024` - | (none) | 用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录。必须可以从所有参与的进程/节点(即所有TaskManagers和JobManagers)访问存储路径。 | -| +描述:状态数据文件的最小大小。小于该值的所有状态块都内联存储在根检查点元数据文件中。 -##### state.checkpoints.num-retained +--- - | 1 | 要retained的已完成检查点的最大数量。 | -| +键:`state.backend.incremental` -##### state.savepoints.dir +默认值:`false` - | (none) | 保存点的默认目录。由将后台写入文件系统的状态后台(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)使用。 | -| +描述:如果可能,选择状态后台是否应创建增量检查点。对于增量检查点,仅存储来自先前检查点的差异,而不是完整的检查点状态。某些状态后台可能不支持增量检查点并忽略此选项。 -##### taskmanager.state.local.root - dirs +--- - | (none) | | +键:`state.backend.local-recovery` + +默认值:`false` + +描述:无 + +--- + +键:`state.checkpoints.dir` + +默认值:`(none)` + +描述:用于在Flink支持的文件系统中存储检查点的数据文件和元数据的默认目录。必须可以从所有参与的进程/节点(即所有TaskManagers和JobManagers)访问存储路径。 + +--- + +键:`state.checkpoints.num-retained` + +默认值:`1` + +描述:要retained的已完成检查点的最大数量。 + +--- + +键:`state.savepoints.dir` + +默认值:`(none)` + +描述:保存点的默认目录。由将后台写入文件系统的状态后台(MemoryStateBackend,FsStateBackend,RocksDBStateBackend)使用。 + +--- + +键:`taskmanager.state.local.root - dirs` + +默认值:`(none)` + +描述:无 ## 选择状态后台 diff --git a/docs/1.7-SNAPSHOT/26.md b/docs/1.7-SNAPSHOT/26.md index 92f49e198385dcffab126ba2e2e46305746373a6..094c9a2904af673137d76c244ff1609d216fef7f 100644 --- a/docs/1.7-SNAPSHOT/26.md +++ b/docs/1.7-SNAPSHOT/26.md @@ -17,12 +17,13 @@ * [**Java**](#tab_java_0) * [**Scala**](#tab_scala_0) -| 转型 | 描述 | -| --- | --- | -| **映射** -DataStream→DataStream | 采用一个数据元并生成一个数据元。一个map函数,它将输入流的值加倍: -<figure class="highlight"> +--- + +转型:**映射** DataStream→DataStream + +描述:采用一个数据元并生成一个数据元。一个map函数,它将输入流的值加倍: + ``` DataStream<Integer> dataStream = //... @@ -34,13 +35,14 @@ dataStream.map(new MapFunction<Integer, Integer>() { }); ``` -</figure> - | -| **FlatMap** -DataStream→DataStream | 采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的flatmap函数: -<figure class="highlight"> +--- + +转型:**FlatMap** DataStream→DataStream + +描述:采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的flatmap函数: + ``` dataStream.flatMap(new FlatMapFunction<String, String>() { @@ -54,13 +56,14 @@ dataStream.flatMap(new FlatMapFunction<String, String>() { }); ``` -</figure> - | -| **Filter** -DataStream→DataStream | 计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器: -<figure class="highlight"> +--- + +转型:**Filter** DataStream→DataStream + +描述:计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器: + ``` dataStream.filter(new FilterFunction<Integer>() { @@ -71,33 +74,35 @@ dataStream.filter(new FilterFunction<Integer>() { }); ``` -</figure> - | -| **KeyBy** -DataStream→KeyedStream | 逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,_keyBy()_是使用散列分区实现的。[指定键](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys)有不同的方法。此转换返回_KeyedStream_,其中包括使用[被Keys化状态](https://flink.sojb.cn/dev/stream/state/state.html#keyed-state)所需的_KeyedStream_。 -<figure class="highlight"> +--- + +转型:**KeyBy** DataStream→KeyedStream + +描述:逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,_keyBy()_是使用散列分区实现的。[指定键](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys)有不同的方法。此转换返回_KeyedStream_,其中包括使用[被Keys化状态](https://flink.sojb.cn/dev/stream/state/state.html#keyed-state)所需的_KeyedStream_。 + ``` dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple ``` -</figure> 注意 如果出现以下情况,则类型**不能成为关键**: 1. 它是POJO类型但不覆盖_hashCode()_方法并依赖于_Object.hashCode()_实现。 2. 它是任何类型的数组。 - | -| **Reduce** -KeyedStream→DataStream | 被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值。 + +--- + +转型:**Reduce** KeyedStream→DataStream + +描述:被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值。 reduce函数,用于创建部分和的流: -<figure class="highlight"> ``` keyedStream.reduce(new ReduceFunction<Integer>() { @@ -109,15 +114,16 @@ keyedStream.reduce(new ReduceFunction<Integer>() { }); ``` -</figure> - | -| **折叠** -KeyedStream→DataStream | 具有初始值的被Keys化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值。 + +--- + +转型:**折叠** KeyedStream→DataStream + +描述:具有初始值的被Keys化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值。 折叠函数,当应用于序列(1,2,3,4,5)时,发出序列“start-1”,“start-1-2”,“start-1-2-3”,. .. -<figure class="highlight"> ``` DataStream<String> result = @@ -129,13 +135,14 @@ DataStream<String> result = }); ``` -</figure> - | -| **聚合** -KeyedStream→DataStream | 在被Keys化数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。 -<figure class="highlight"> +--- + +转型:**聚合** KeyedStream→DataStream + +描述:在被Keys化数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。 + ``` keyedStream.sum(0); @@ -150,38 +157,40 @@ keyedStream.maxBy(0); keyedStream.maxBy("key"); ``` -</figure> - | -| **Window** -KeyedStream→WindowedStream | 可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。有关[窗口](windows.html)的完整说明,请参见windows。 -<figure class="highlight"> +--- + +转型:**Window** KeyedStream→WindowedStream + +描述:可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组。有关[窗口](windows.html)的完整说明,请参见windows。 + ``` dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data ``` -</figure> - | -| **WindowAll** -DataStream→AllWindowedStream | Windows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数据)对所有流事件进行分组。有关[窗口](windows.html)的完整说明,请参见windows。**警告:**在许多情况下,这**是非并行**转换。所有记录将收集在windowAll 算子的一个任务中。 -<figure class="highlight"> +--- + +转型:**WindowAll** DataStream→AllWindowedStream + +描述:Windows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数据)对所有流事件进行分组。有关[窗口](windows.html)的完整说明,请参见windows。**警告:**在许多情况下,这**是非并行**转换。所有记录将收集在windowAll 算子的一个任务中。 + ``` dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data ``` -</figure> - | -| **Window Apply** -WindowedStream→DataStream -AllWindowedStream→DataStream | 将一般函数应用于整个窗口。下面是一个手动求和窗口数据元的函数。**注意:**如果您正在使用windowAll转换,则需要使用AllWindowFunction。 -<figure class="highlight"> +--- + +转型:**Window Apply** WindowedStream→DataStream AllWindowedStream→DataStream + +描述:将一般函数应用于整个窗口。下面是一个手动求和窗口数据元的函数。**注意:**如果您正在使用windowAll转换,则需要使用AllWindowFunction。 + ``` windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { @@ -211,13 +220,14 @@ allWindowedStream.apply (new AllWindowFunction<Tuple2<String,Integer>, }); ``` -</figure> - | -| **Window Reduce** -WindowedStream→DataStream | 将函数缩减函数应用于窗口并返回缩小的值。 -<figure class="highlight"> +--- + +转型:**Window Reduce** WindowedStream→DataStream + +描述:将函数缩减函数应用于窗口并返回缩小的值。 + ``` windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() { @@ -227,13 +237,14 @@ windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() }); ``` -</figure> - | -| **Window Fold** -WindowedStream→DataStream | 将函数折叠函数应用于窗口并返回折叠值。示例函数应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”: -<figure class="highlight"> +--- + +转型:**Window Fold** WindowedStream→DataStream + +描述:将函数折叠函数应用于窗口并返回折叠值。示例函数应用于序列(1,2,3,4,5)时,将序列折叠为字符串“start-1-2-3-4-5”: + ``` windowedStream.fold("start", new FoldFunction<Integer, String>() { @@ -243,13 +254,14 @@ windowedStream.fold("start", new FoldFunction<Integer, String>() { }); ``` -</figure> - | -| **Windows上的聚合** -WindowedStream→DataStream | 聚合窗口的内容。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。 -<figure class="highlight"> +--- + +转型:**Windows上的聚合** WindowedStream→DataStream + +描述:聚合窗口的内容。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。 + ``` windowedStream.sum(0); @@ -264,25 +276,27 @@ windowedStream.maxBy(0); windowedStream.maxBy("key"); ``` -</figure> - | -| **Union** -DataStream *→DataStream | 两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元。 -<figure class="highlight"> +--- + +转型:**Union** DataStream *→DataStream + +描述:两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元。 + ``` dataStream.union(otherStream1, otherStream2, ...); ``` -</figure> - | -| **Window Join** -DataStream,DataStream→DataStream | 在给定Keys和公共窗口上连接两个数据流。 -<figure class="highlight"> +--- + +转型:**Window Join** DataStream,DataStream→DataStream + +描述:在给定Keys和公共窗口上连接两个数据流。 + ``` dataStream.join(otherStream) @@ -291,13 +305,14 @@ dataStream.join(otherStream) .apply (new JoinFunction () {...}); ``` -</figure> - | -| **Interval Join** -KeyedStream,KeyedStream→DataStream | 在给定的时间间隔内使用公共Keys关联两个被Key化的数据流的两个数据元e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound -<figure class="highlight"> +--- + +转型:**Interval Join** KeyedStream,KeyedStream→DataStream + +描述:在给定的时间间隔内使用公共Keys关联两个被Key化的数据流的两个数据元e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound + ``` // this will join the two streams so that @@ -309,13 +324,14 @@ keyedStream.intervalJoin(otherKeyedStream) .process(new IntervalJoinFunction() {...}); ``` -</figure> - | -| **Window CoGroup** -DataStream,DataStream→DataStream | 在给定Keys和公共窗口上对两个数据流进行Cogroup。 -<figure class="highlight"> +--- + +转型:**Window CoGroup** DataStream,DataStream→DataStream + +描述:在给定Keys和公共窗口上对两个数据流进行Cogroup。 + ``` dataStream.coGroup(otherStream) @@ -324,13 +340,14 @@ dataStream.coGroup(otherStream) .apply (new CoGroupFunction () {...}); ``` -</figure> - | -| **连接** -DataStream,DataStream→ConnectedStreams | “连接”两个保存其类型的数据流。连接允许两个流之间的共享状态。 -<figure class="highlight"> +--- + +转型:**连接** DataStream,DataStream→ConnectedStreams + +描述:“连接”两个保存其类型的数据流。连接允许两个流之间的共享状态。 + ``` DataStream<Integer> someStream = //... @@ -339,13 +356,14 @@ DataStream<String> otherStream = //... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream); ``` -</figure> - | -| **CoMap,CoFlatMap** -ConnectedStreams→DataStream | 类似于连接数据流上的map和flatMap -<figure class="highlight"> +--- + +转型:**CoMap,CoFlatMap** ConnectedStreams→DataStream + +描述:类似于连接数据流上的map和flatMap + ``` connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() { @@ -375,13 +393,14 @@ connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() }); ``` -</figure> - | -| **拆分** -DataStream→SplitStream | 根据某些标准将流拆分为两个或更多个流。 -<figure class="highlight"> +--- + +转型:**拆分** DataStream→SplitStream + +描述:根据某些标准将流拆分为两个或更多个流。 + ``` SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() { @@ -399,13 +418,14 @@ SplitStream<Integer> split = someDataStream.split(new OutputSelector<In }); ``` -</figure> - | -| **选择** -SplitStream→DataStream | 从拆分流中选择一个或多个流。 -<figure class="highlight"> +--- + +转型:**选择** SplitStream→DataStream + +描述:从拆分流中选择一个或多个流。 + ``` SplitStream<Integer> split; @@ -414,13 +434,14 @@ DataStream<Integer> odd = split.select("odd"); DataStream<Integer> all = split.select("even","odd"); ``` -</figure> - | -| **迭代** -DataStream→IterativeStream→DataStream | 通过将一个 算子的输出重定向到某个先前的 算子,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于0的数据元将被发送回反馈通道,其余数据元将向下游转发。有关完整说明,请参阅[迭代](#iterations)。 -<figure class="highlight"> +--- + +转型:**迭代** DataStream→IterativeStream→DataStream + +描述:通过将一个 算子的输出重定向到某个先前的 算子,在流中创建“反馈”循环。这对于定义不断更新模型的算法特别有用。以下代码以流开头并连续应用迭代体。大于0的数据元将被发送回反馈通道,其余数据元将向下游转发。有关完整说明,请参阅[迭代](#iterations)。 + ``` IterativeStream<Long> iteration = initialStream.iterate(); @@ -440,105 +461,111 @@ DataStream<Long> output = iterationBody.filter(new FilterFunction<Long& }); ``` -</figure> - | -| **提取时间戳** -DataStream→DataStream | 从记录中提取时间戳,以便使用使用事件时间语义的窗口。查看[活动时间](https://flink.sojb.cn/dev/event_time.html)。 -<figure class="highlight"> +--- + +转型:**提取时间戳** DataStream→DataStream + +描述:从记录中提取时间戳,以便使用使用事件时间语义的窗口。查看[活动时间](https://flink.sojb.cn/dev/event_time.html)。 + ``` stream.assignTimestamps (new TimeStampExtractor() {...}); ``` -</figure> - | -| Transformation | Description | -| --- | --- | -| **Map** -DataStream → DataStream | Takes one element and produces one element. A map function that doubles the values of the input stream: -<figure class="highlight"> + +--- + +转型:**Map** DataStream → DataStream + +描述:Takes one element and produces one element. A map function that doubles the values of the input stream: + ``` dataStream.map { x => x * 2 } ``` -</figure> - | -| **FlatMap** -DataStream → DataStream | Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words: -<figure class="highlight"> +--- + +转型:**FlatMap** DataStream → DataStream + +描述:Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words: + ``` dataStream.flatMap { str => str.split(" ") } ``` -</figure> - | -| **Filter** -DataStream → DataStream | Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values: -<figure class="highlight"> +--- + +转型:**Filter** DataStream → DataStream + +描述:Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values: + ``` dataStream.filter { _ != 0 } ``` -</figure> - | -| **KeyBy** -DataStream → KeyedStream | Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See [keys](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys) on how to specify keys. This transformation returns a KeyedStream. -<figure class="highlight"> +--- + +转型:**KeyBy** DataStream → KeyedStream + +描述:Logically partitions a stream into disjoint partitions, each partition containing elements of the same key. Internally, this is implemented with hash partitioning. See [keys](https://flink.sojb.cn/dev/api_concepts.html#specifying-keys) on how to specify keys. This transformation returns a KeyedStream. + ``` dataStream.keyBy("someKey") // Key by field "someKey" dataStream.keyBy(0) // Key by the first element of a Tuple ``` -</figure> - | -| **Reduce** -KeyedStream → DataStream | A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value. + +--- + +转型:**Reduce** KeyedStream → DataStream + +描述:A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value. A reduce function that creates a stream of partial sums: -<figure class="highlight"> ``` keyedStream.reduce { _ + _ } ``` -</figure> -</p> | -| **Fold** -KeyedStream → DataStream | A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value. +--- + +转型:**Fold** KeyedStream → DataStream + +描述:A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value. A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ... -<figure class="highlight"> ``` val result: DataStream[String] = keyedStream.fold("start")((str, i) => { str + "-" + i }) ``` -</figure> - | -| **Aggregations** -KeyedStream → DataStream | Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). -<figure class="highlight"> +--- + +转型:**Aggregations** KeyedStream → DataStream + +描述:Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). + ``` keyedStream.sum(0) @@ -553,38 +580,38 @@ keyedStream.maxBy(0) keyedStream.maxBy("key") ``` -</figure> - | -| **Window** -KeyedStream → WindowedStream | Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See [windows](windows.html) for a description of windows. -<figure class="highlight"> +--- + +转型:**Window** KeyedStream → WindowedStream + +描述:Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See [windows](windows.html) for a description of windows. + ``` dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data ``` -</figure> - | -| **WindowAll** -DataStream → AllWindowedStream | Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See [windows](windows.html) for a complete description of windows.**WARNING:** This is in many cases a **non-parallel** transformation. All records will be gathered in one task for the windowAll operator. -<figure class="highlight"> +--- + +转型:**WindowAll** DataStream → AllWindowedStream + +描述:Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See [windows](windows.html) for a complete description of windows.**WARNING:** This is in many cases a **non-parallel** transformation. All records will be gathered in one task for the windowAll operator. + ``` dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // Last 5 seconds of data ``` -</figure> - | -| **Window Apply** -WindowedStream → DataStream -AllWindowedStream → DataStream | Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.**Note:** If you are using a windowAll transformation, you need to use an AllWindowFunction instead. -<figure class="highlight"> +--- + +转型:**Window Apply** WindowedStream → DataStream AllWindowedStream → DataStream | Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.**Note:** If you are using a windowAll transformation, you need to use an AllWindowFunction instead. + ``` windowedStream.apply { WindowFunction } @@ -592,38 +619,41 @@ windowedStream.apply { WindowFunction } // applying an AllWindowFunction on non-keyed window stream allWindowedStream.apply { AllWindowFunction } ``` -</figure> - | -| **Window Reduce** -WindowedStream → DataStream | Applies a functional reduce function to the window and returns the reduced value. -<figure class="highlight"> +--- + +转型:**Window Reduce** WindowedStream → DataStream + +描述:Applies a functional reduce function to the window and returns the reduced value. + ``` windowedStream.reduce { _ + _ } ``` -</figure> - | -| **Window Fold** -WindowedStream → DataStream | Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5": -<figure class="highlight"> +--- + +转型:**Window Fold** WindowedStream → DataStream + +描述:Applies a functional fold function to the window and returns the folded value. The example function, when applied on the sequence (1,2,3,4,5), folds the sequence into the string "start-1-2-3-4-5": + ``` val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i }) ``` -</figure> - | -| **Aggregations on windows** -WindowedStream → DataStream | Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). -<figure class="highlight"> +--- + +转型:**Aggregations on windows** WindowedStream → DataStream + +描述:Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy). + ``` windowedStream.sum(0) @@ -638,25 +668,27 @@ windowedStream.maxBy(0) windowedStream.maxBy("key") ``` -</figure> - | -| **Union** -DataStream* → DataStream | Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream. -<figure class="highlight"> +--- + +转型:**Union** DataStream* → DataStream + +描述:Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream. + ``` dataStream.union(otherStream1, otherStream2, ...) ``` -</figure> - | -| **Window Join** -DataStream,DataStream → DataStream | Join two data streams on a given key and a common window. -<figure class="highlight"> +--- + +转型:**Window Join** DataStream,DataStream → DataStream + +描述:Join two data streams on a given key and a common window. + ``` dataStream.join(otherStream) @@ -665,13 +697,14 @@ dataStream.join(otherStream) .apply { ... } ``` -</figure> - | -| **Window CoGroup** -DataStream,DataStream → DataStream | Cogroups two data streams on a given key and a common window. -<figure class="highlight"> +--- + +转型:**Window CoGroup** DataStream,DataStream → DataStream + +描述:Cogroups two data streams on a given key and a common window. + ``` dataStream.coGroup(otherStream) @@ -680,13 +713,14 @@ dataStream.coGroup(otherStream) .apply {} ``` -</figure> - | -| **Connect** -DataStream,DataStream → ConnectedStreams | "Connects" two data streams retaining their types, allowing for shared state between the two streams. -<figure class="highlight"> +--- + +转型:**Connect** DataStream,DataStream → ConnectedStreams + +描述:"Connects" two data streams retaining their types, allowing for shared state between the two streams. + ``` someStream : DataStream[Int] = ... @@ -695,13 +729,14 @@ otherStream : DataStream[String] = ... val connectedStreams = someStream.connect(otherStream) ``` -</figure> - | -| **CoMap, CoFlatMap** -ConnectedStreams → DataStream | Similar to map and flatMap on a connected data stream -<figure class="highlight"> +--- + +转型:**CoMap, CoFlatMap** ConnectedStreams → DataStream + +描述:Similar to map and flatMap on a connected data stream + ``` connectedStreams.map( @@ -714,13 +749,14 @@ connectedStreams.flatMap( ) ``` -</figure> - | -| **Split** -DataStream → SplitStream | Split the stream into two or more streams according to some criterion. -<figure class="highlight"> +--- + +转型:**Split** DataStream → SplitStream + +描述:Split the stream into two or more streams according to some criterion. + ``` val split = someDataStream.split( @@ -732,13 +768,14 @@ val split = someDataStream.split( ) ``` -</figure> - | -| **Select** -SplitStream → DataStream | Select one or more streams from a split stream. -<figure class="highlight"> +--- + +转型:**Select** SplitStream → DataStream + +描述:Select one or more streams from a split stream. + ``` val even = split select "even" @@ -746,13 +783,14 @@ val odd = split select "odd" val all = split.select("even","odd") ``` -</figure> - | -| **Iterate** -DataStream → IterativeStream → DataStream | Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See [iterations](#iterations) for a complete description. -<figure class="highlight"> +--- + +转型:**Iterate** DataStream → IterativeStream → DataStream + +描述:Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See [iterations](#iterations) for a complete description. + ``` initialStream.iterate { @@ -763,21 +801,21 @@ initialStream.iterate { } ``` -</figure> - | -| **Extract Timestamps** -DataStream → DataStream | Extracts timestamps from records in order to work with windows that use event time semantics. See [Event Time](https://flink.sojb.cn/dev/event_time.html). -<figure class="highlight"> +--- + +转型:**Extract Timestamps** DataStream → DataStream + +描述:Extracts timestamps from records in order to work with windows that use event time semantics. See [Event Time](https://flink.sojb.cn/dev/event_time.html). + ``` stream.assignTimestamps { timestampExtractor } ``` -</figure> - | + Extraction from tuples, case classes and collections via anonymous pattern matching, like the following: @@ -796,21 +834,21 @@ is not supported by the API out-of-the-box. To use this feature, you should use * [**Java**](#tab_java_1) -| 转型 | 描述 | -| --- | --- | -| **Project** -DataStream→DataStream | 从元组中选择字段的子集 -<figure class="highlight"> +--- + +转型:**Project** DataStream→DataStream + +描述:从元组中选择字段的子集 + ``` DataStream<Tuple3<Integer, Double, String>> in = // [...] DataStream<Tuple2<String, Integer>> out = in.project(2,0); ``` -</figure> - | + # 物理分区 @@ -822,133 +860,141 @@ Flink还通过以下函数对转换后的精确流分区进行低级控制(如 * [**Java**](#tab_java_2) * [**Scala**](#tab_scala_2) -| 转型 | 描述 | -| --- | --- | -| **自定义分区** -DataStream→DataStream | 使用用户定义的分区程序为每个数据元选择目标任务。 -<figure class="highlight"> +--- + +转型:**自定义分区** DataStream→DataStream + +描述:使用用户定义的分区程序为每个数据元选择目标任务。 + ``` dataStream.partitionCustom(partitioner, "someKey"); dataStream.partitionCustom(partitioner, 0); ``` -</figure> - | -| **随机分区** -DataStream→DataStream | 根据均匀分布随机分配数据元。 -<figure class="highlight"> +--- + +转型:**随机分区** DataStream→DataStream + +描述:根据均匀分布随机分配数据元。 + ``` dataStream.shuffle(); ``` -</figure> - | -| **Rebalance (循环分区)** -DataStream→DataStream | 分区数据元循环,每个分区创建相等的负载。在存在数据偏斜时用于性能优化。 -<figure class="highlight"> +--- + +转型:**Rebalance (循环分区)** DataStream→DataStream + +描述:分区数据元循环,每个分区创建相等的负载。在存在数据偏斜时用于性能优化。 + ``` dataStream.rebalance(); ``` -</figure> - | -| **重新调整** -DataStream→DataStream | 分区数据元,循环,到下游 算子操作的子集。如果您希望拥有管道,例如,从源的每个并行实例扇出到多个映射器的子集以分配负载但又不希望发生rebalance()会产生完全Rebalance ,那么这非常有用。这将仅需要本地数据传输而不是通过网络传输数据,具体取决于其他配置值,例如TaskManagers的插槽数。上游 算子操作发送数据元的下游 算子操作的子集取决于上游和下游 算子操作的并行度。例如,如果上游 算子操作具有并行性2并且下游 算子操作具有并行性6,则一个上游 算子操作将分配元件到三个下游 算子操作,而另一个上游 算子操作将分配到其他三个下游 算子操作。另一方面,如果下游 算子操作具有并行性2而上游 算子操作具有并行性6,则三个上游 算子操作将分配到一个下游 算子操作,而其他三个上游 算子操作将分配到另一个下游 算子操作。在不同并行度不是彼此的倍数的情况下,一个或多个下游 算子操作将具有来自上游 算子操作的不同数量的输入。请参阅此图以获取上例中连接模式的可视化:![数据流中的检查点障碍](../img/rescale.svg) -<figure class="highlight"> +--- + +转型:**重新调整** DataStream→DataStream + +描述:分区数据元,循环,到下游 算子操作的子集。如果您希望拥有管道,例如,从源的每个并行实例扇出到多个映射器的子集以分配负载但又不希望发生rebalance()会产生完全Rebalance ,那么这非常有用。这将仅需要本地数据传输而不是通过网络传输数据,具体取决于其他配置值,例如TaskManagers的插槽数。上游 算子操作发送数据元的下游 算子操作的子集取决于上游和下游 算子操作的并行度。例如,如果上游 算子操作具有并行性2并且下游 算子操作具有并行性6,则一个上游 算子操作将分配元件到三个下游 算子操作,而另一个上游 算子操作将分配到其他三个下游 算子操作。另一方面,如果下游 算子操作具有并行性2而上游 算子操作具有并行性6,则三个上游 算子操作将分配到一个下游 算子操作,而其他三个上游 算子操作将分配到另一个下游 算子操作。在不同并行度不是彼此的倍数的情况下,一个或多个下游 算子操作将具有来自上游 算子操作的不同数量的输入。请参阅此图以获取上例中连接模式的可视化:![数据流中的检查点障碍](../img/rescale.svg) + ``` dataStream.rescale(); ``` -</figure> - | -| **广播** -DataStream→DataStream | 向每个分区广播数据元。 -<figure class="highlight"> +--- + +转型:**广播** DataStream→DataStream + +描述:向每个分区广播数据元。 + ``` dataStream.broadcast(); ``` -</figure> - | -| Transformation | Description | -| --- | --- | -| **Custom partitioning** -DataStream → DataStream | Uses a user-defined Partitioner to select the target task for each element. -<figure class="highlight"> + +--- + +转型:**Custom partitioning** DataStream → DataStream + +描述:Uses a user-defined Partitioner to select the target task for each element. + ``` dataStream.partitionCustom(partitioner, "someKey") dataStream.partitionCustom(partitioner, 0) ``` -</figure> - | -| **Random partitioning** -DataStream → DataStream | Partitions elements randomly according to a uniform distribution. -<figure class="highlight"> +--- + +转型:**Random partitioning** DataStream → DataStream + +描述:Partitions elements randomly according to a uniform distribution. + ``` dataStream.shuffle() ``` -</figure> - | -| **Rebalancing (Round-robin partitioning)** -DataStream → DataStream | Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew. -<figure class="highlight"> +--- + +转型:**Rebalancing (Round-robin partitioning)** DataStream → DataStream + +描述:Partitions elements round-robin, creating equal load per partition. Useful for performance optimization in the presence of data skew. + ``` dataStream.rebalance() ``` -</figure> - | -| **Rescaling** -DataStream → DataStream | Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations while the other upstream operation would distribute to the other two downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4 then two upstream operations would distribute to one downstream operation while the other two upstream operations would distribute to the other downstream operations.In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.</p> Please see this figure for a visualization of the connection pattern in the above example: </p>![Checkpoint barriers in data streams](../img/rescale.svg) -<figure class="highlight"> +--- + +转型:**Rescaling** DataStream → DataStream + +描述:Partitions elements, round-robin, to a subset of downstream operations. This is useful if you want to have pipelines where you, for example, fan out from each parallel instance of a source to a subset of several mappers to distribute load but don't want the full rebalance that rebalance() would incur. This would require only local data transfers instead of transferring data over network, depending on other configuration values such as the number of slots of TaskManagers.The subset of downstream operations to which the upstream operation sends elements depends on the degree of parallelism of both the upstream and downstream operation. For example, if the upstream operation has parallelism 2 and the downstream operation has parallelism 4, then one upstream operation would distribute elements to two downstream operations while the other upstream operation would distribute to the other two downstream operations. If, on the other hand, the downstream operation has parallelism 2 while the upstream operation has parallelism 4 then two upstream operations would distribute to one downstream operation while the other two upstream operations would distribute to the other downstream operations.In cases where the different parallelisms are not multiples of each other one or several downstream operations will have a differing number of inputs from upstream operations.</p> Please see this figure for a visualization of the connection pattern in the above example: </p>![Checkpoint barriers in data streams](../img/rescale.svg) + ``` dataStream.rescale() ``` -</figure> - | -| **Broadcasting** -DataStream → DataStream | Broadcasts elements to every partition. -<figure class="highlight"> +--- + +转型:**Broadcasting** DataStream → DataStream + +描述:Broadcasts elements to every partition. + ``` dataStream.broadcast() ``` -</figure> - | + # 任务链和资源组 @@ -964,75 +1010,85 @@ dataStream.broadcast() * [**Java**](#tab_java_3) * [**Scala**](#tab_scala_3) -| 转型 | 描述 | -| --- | --- | -| 开始新的链条 | 从这个 算子开始,开始一个新的链。两个映射器将被链接,并且过滤器将不会链接到第一个映射器。 -<figure class="highlight"> +--- + +转型:开始新的链条 + +描述:从这个 算子开始,开始一个新的链。两个映射器将被链接,并且过滤器将不会链接到第一个映射器。 + ``` someStream.filter(...).map(...).startNewChain().map(...); ``` -</figure> - | -| 禁用链接 | 不要链接Map 算子 -<figure class="highlight"> +--- + +转型:禁用链接 + +描述:不要链接Map 算子 + ``` someStream.map(...).disableChaining(); ``` -</figure> - | -| 设置插槽共享组 | 设置 算子操作的插槽共享组。Flink将把具有相同插槽共享组的 算子操作放入同一个插槽,同时保持其他插槽中没有插槽共享组的 算子操作。这可用于隔离插槽。如果所有输入 算子操作都在同一个插槽共享组中,则插槽共享组将继承输入 算子操作。默认插槽共享组的名称为“default”,可以通过调用slotSharingGroup(“default”)将 算子操作显式放入此组中。 -<figure class="highlight"> +--- + +转型:设置插槽共享组 + +描述:设置 算子操作的插槽共享组。Flink将把具有相同插槽共享组的 算子操作放入同一个插槽,同时保持其他插槽中没有插槽共享组的 算子操作。这可用于隔离插槽。如果所有输入 算子操作都在同一个插槽共享组中,则插槽共享组将继承输入 算子操作。默认插槽共享组的名称为“default”,可以通过调用slotSharingGroup(“default”)将 算子操作显式放入此组中。 + ``` someStream.filter(...).slotSharingGroup("name"); ``` -</figure> - | -| Transformation | Description | -| --- | --- | -| Start new chain | Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper. -<figure class="highlight"> + +--- + +转型:Start new chain + +描述:Begin a new chain, starting with this operator. The two mappers will be chained, and filter will not be chained to the first mapper. + ``` someStream.filter(...).map(...).startNewChain().map(...) ``` -</figure> - | -| Disable chaining | Do not chain the map operator -<figure class="highlight"> +--- + +转型:Disable chaining + +描述:Do not chain the map operator + ``` someStream.map(...).disableChaining() ``` -</figure> - | -| Set slot sharing group | Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default"). -<figure class="highlight"> +--- + +转型:Set slot sharing group + +描述:Set the slot sharing group of an operation. Flink will put operations with the same slot sharing group into the same slot while keeping operations that don't have the slot sharing group in other slots. This can be used to isolate slots. The slot sharing group is inherited from input operations if all input operations are in the same slot sharing group. The name of the default slot sharing group is "default", operations can explicitly be put into this group by calling slotSharingGroup("default"). + ``` someStream.filter(...).slotSharingGroup("name") ``` -</figure> - | + diff --git a/docs/1.7-SNAPSHOT/79.md b/docs/1.7-SNAPSHOT/79.md index 3eaaec8ae3302f2eca4379d32022bc3e746e3fda..1d7d1e90ecda41d897ad5d012595ee7c75d75307 100644 --- a/docs/1.7-SNAPSHOT/79.md +++ b/docs/1.7-SNAPSHOT/79.md @@ -7,12 +7,12 @@ `Graph`用于组装API和顶级算法的逻辑块可在Gelly中作为`org.apache.flink.graph.asm`包中的图形算法访问。这些算法通过配置参数提供优化和调整,并且在使用类似配置处理相同输入时可以提供隐式运行时重用。 -| 算法 | 描述 | -| --- | --- | -| degree.annotate.directed。 -**VertexInDegree** | 使用in-degree 注释有[向图的](#graph-representation)顶点。 +--- + +算法:`degree.annotate.directed。VertexInDegree` + +描述:使用in-degree 注释有[向图的](#graph-representation)顶点。 -<figure class="highlight"> ``` DataSet<Vertex<K, LongValue>> inDegree = graph @@ -20,7 +20,6 @@ DataSet<Vertex<K, LongValue>> inDegree = graph .setIncludeZeroDegreeVertices(true)); ``` -</figure> 可选配置: @@ -28,11 +27,12 @@ DataSet<Vertex<K, LongValue>> inDegree = graph * **setParallelism**:覆盖 算子并行度 - | -| degree.annotate.directed。 -**VertexOutDegree** | 使用out-degree 注释有[向图的](#graph-representation)顶点。 +--- + +算法:`degree.annotate.directed。VertexOutDegree` + +描述:使用out-degree 注释有[向图的](#graph-representation)顶点。 -<figure class="highlight"> ``` DataSet<Vertex<K, LongValue>> outDegree = graph @@ -40,7 +40,6 @@ DataSet<Vertex<K, LongValue>> outDegree = graph .setIncludeZeroDegreeVertices(true)); ``` -</figure> 可选配置: @@ -48,11 +47,12 @@ DataSet<Vertex<K, LongValue>> outDegree = graph * **setParallelism**:覆盖 算子并行度 - | -| degree.annotate.directed。 -**VertexDegrees** | 使用degree,out-degree和in-degree 注释有[向图的](#graph-representation)顶点。 +--- + +算法:`degree.annotate.directed。VertexDegrees` + +描述:使用degree,out-degree和in-degree 注释有[向图的](#graph-representation)顶点。 -<figure class="highlight"> ``` DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> degrees = graph @@ -60,7 +60,6 @@ DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> degrees = gra .setIncludeZeroDegreeVertices(true)); ``` -</figure> 可选配置: @@ -68,62 +67,63 @@ DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> degrees = gra * **setParallelism**:覆盖 算子并行度 - | -| degree.annotate.directed。 -**EdgeSourceDegrees** | 使用源ID的度,出度和度数来标注有[向图的](#graph-representation)边。 +--- + +算法:`degree.annotate.directed。EdgeSourceDegrees` + +描述:使用源ID的度,出度和度数来标注有[向图的](#graph-representation)边。 -<figure class="highlight"> ``` DataSet<Edge<K, Tuple2<EV, Degrees>>> sourceDegrees = graph .run(new EdgeSourceDegrees()); ``` -</figure> 可选配置: * **setParallelism**:覆盖 算子并行度 - | -| degree.annotate.directed。 -**EdgeTargetDegrees** | 使用目标ID的度,出度和度数来标注有[向图的](#graph-representation)边。 +--- + +算法:`degree.annotate.directed。EdgeTargetDegrees` + +描述:使用目标ID的度,出度和度数来标注有[向图的](#graph-representation)边。 -<figure class="highlight"> ``` DataSet<Edge<K, Tuple2<EV, Degrees>>> targetDegrees = graph .run(new EdgeTargetDegrees(); ``` -</figure> 可选配置: * **setParallelism**:覆盖 算子并行度 - | -| degree.annotate.directed。 -**EdgeDegreesPair** | 使用源顶点和目标顶点的度,出度和度数来标注有[向图](#graph-representation)的边。 +--- + +算法:`degree.annotate.directed。EdgeDegreesPair` + +描述:使用源顶点和目标顶点的度,出度和度数来标注有[向图](#graph-representation)的边。 -<figure class="highlight"> ``` DataSet<Edge<K, Tuple2<EV, Degrees>>> degrees = graph .run(new EdgeDegreesPair()); ``` -</figure> 可选配置: * **setParallelism**:覆盖 算子并行度 - | -| degree.annotate.undirected。 -**VertexDegree** | 用度数注释[无向图的](#graph-representation)顶点。 +--- + +算法:`degree.annotate.undirected。VertexDegree` + +描述:用度数注释[无向图的](#graph-representation)顶点。 -<figure class="highlight"> ``` DataSet<Vertex<K, LongValue>> degree = graph @@ -132,7 +132,6 @@ DataSet<Vertex<K, LongValue>> degree = graph .setReduceOnTargetId(true)); ``` -</figure> 可选配置: @@ -142,11 +141,12 @@ DataSet<Vertex<K, LongValue>> degree = graph * **setReduceOnTargetId**:可以从边缘源ID或目标ID计算度数。默认情况下,会计算源ID。如果输入边缘列表按目标ID排序,则Reduce目标ID可以优化算法。 - | -| degree.annotate.undirected。 -**EdgeSourceDegree** | 使用源ID的度数注释[无向图的](#graph-representation)边。 +--- + +算法:`degree.annotate.undirected。EdgeSourceDegree` + +描述:使用源ID的度数注释[无向图的](#graph-representation)边。 -<figure class="highlight"> ``` DataSet<Edge<K, Tuple2<EV, LongValue>>> sourceDegree = graph @@ -154,7 +154,6 @@ DataSet<Edge<K, Tuple2<EV, LongValue>>> sourceDegree = graph .setReduceOnTargetId(true)); ``` -</figure> 可选配置: @@ -162,11 +161,12 @@ DataSet<Edge<K, Tuple2<EV, LongValue>>> sourceDegree = graph * **setReduceOnTargetId**:可以从边缘源ID或目标ID计算度数。默认情况下,会计算源ID。如果输入边缘列表按目标ID排序,则Reduce目标ID可以优化算法。 - | -| degree.annotate.undirected。 -**EdgeTargetDegree** | 使用目标ID的度数注释[无向图的](#graph-representation)边。 +--- + +算法:`degree.annotate.undirected。EdgeTargetDegree` + +描述:使用目标ID的度数注释[无向图的](#graph-representation)边。 -<figure class="highlight"> ``` DataSet<Edge<K, Tuple2<EV, LongValue>>> targetDegree = graph @@ -174,7 +174,6 @@ DataSet<Edge<K, Tuple2<EV, LongValue>>> targetDegree = graph .setReduceOnSourceId(true)); ``` -</figure> 可选配置: @@ -182,11 +181,12 @@ DataSet<Edge<K, Tuple2<EV, LongValue>>> targetDegree = graph * **setReduceOnSourceId**:可以从边缘源ID或目标ID计算度。默认情况下,计算目标ID。如果输入边缘列表按源ID排序,则Reduce源ID可以优化算法。 - | -| degree.annotate.undirected。 -**EdgeDegreePair** | 使用源顶点和目标顶点的度数注释[无向图](#graph-representation)的边。 +--- + +算法:`degree.annotate.undirected。EdgeDegreePair` + +描述:使用源顶点和目标顶点的度数注释[无向图](#graph-representation)的边。 -<figure class="highlight"> ``` DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree = graph @@ -194,7 +194,6 @@ DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree .setReduceOnTargetId(true)); ``` -</figure> 可选配置: @@ -202,11 +201,12 @@ DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree * **setReduceOnTargetId**:可以从边缘源ID或目标ID计算度数。默认情况下,会计算源ID。如果输入边缘列表按目标ID排序,则Reduce目标ID可以优化算法。 - | -| degree.filter.undirected。 -**MaximumDegree** | 按最大程度Filter[无向图](#graph-representation)。 +--- + +算法:`degree.filter.undirected。MaximumDegree` + +描述:按最大程度Filter[无向图](#graph-representation)。 -<figure class="highlight"> ``` Graph<K, VV, EV> filteredGraph = graph @@ -215,7 +215,6 @@ Graph<K, VV, EV> filteredGraph = graph .setReduceOnTargetId(true)); ``` -</figure> 可选配置: @@ -225,49 +224,49 @@ Graph<K, VV, EV> filteredGraph = graph * **setReduceOnTargetId**:可以从边缘源ID或目标ID计算度数。默认情况下,会计算源ID。如果输入边缘列表按目标ID排序,则Reduce目标ID可以优化算法。 - | -| simple.directed。 -**简化** | 从有[向图中](#graph-representation)删除自循环和重复边。 +--- + +算法:`simple.directed。简化` + +描述:从有[向图中](#graph-representation)删除自循环和重复边。 -<figure class="highlight"> ``` graph.run(new Simplify()); ``` -</figure> 可选配置: * **setParallelism**:覆盖 算子并行度 - | -| simple.undirected。 -**简化** | 添加对称边并[从无向图中](#graph-representation)移除自循环和复制边。 +--- + +算法:`simple.undirected。简化` + +描述:添加对称边并[从无向图中](#graph-representation)移除自循环和复制边。 -<figure class="highlight"> ``` graph.run(new Simplify()); ``` -</figure> 可选配置: * **setParallelism**:覆盖 算子并行度 - | -| 翻译。 -**TranslateGraphIds** | 使用给定的转换顶点和边ID `TranslateFunction`。 +--- + +算法:`翻译。TranslateGraphIds` + +描述:使用给定的转换顶点和边ID `TranslateFunction`。 -<figure class="highlight"> ``` graph.run(new TranslateGraphIds(new LongValueToStringValue())); ``` -</figure> 所需配置: @@ -277,17 +276,17 @@ graph.run(new TranslateGraphIds(new LongValueToStringValue())); * **setParallelism**:覆盖 算子并行度 - | -| 翻译。 -**TranslateVertexValues** | 使用给定的转换顶点值`TranslateFunction`。 +--- + +算法:`翻译。TranslateVertexValues` + +描述:使用给定的转换顶点值`TranslateFunction`。 -<figure class="highlight"> ``` graph.run(new TranslateVertexValues(new LongValueAddOffset(vertexCount))); ``` -</figure> 所需配置: @@ -297,17 +296,17 @@ graph.run(new TranslateVertexValues(new LongValueAddOffset(vertexCount))); * **setParallelism**:覆盖 算子并行度 - | -| 翻译。 -**TranslateEdgeValues** | 使用给定的转换边缘值`TranslateFunction`。 +--- + +算法:`翻译。TranslateEdgeValues` + +描述:使用给定的转换边缘值`TranslateFunction`。 -<figure class="highlight"> ``` graph.run(new TranslateEdgeValues(new Nullify())); ``` -</figure> 所需配置: @@ -317,5 +316,4 @@ graph.run(new TranslateEdgeValues(new Nullify())); * **setParallelism**:覆盖 算子并行度 - |