36.md 35.3 KB
Newer Older
W
init  
wizardforcel 已提交
1 2 3 4 5


# Amazon AWS Kinesis Streams Connector

The Kinesis connector provides access to [Amazon AWS Kinesis Streams](http://aws.amazon.com/kinesis/streams/).
噼里啪啦嘣 已提交
6
Kinesis连接器提供对Amazon AWS Kinesis Streams的访问。
W
init  
wizardforcel 已提交
7 8

To use the connector, add the following Maven dependency to your project:
噼里啪啦嘣 已提交
9
要使用连接器,请将以下Maven依赖项添加到您的项目中:
W
wizardforcel 已提交
10

W
init  
wizardforcel 已提交
11 12 13 14 15 16 17 18 19

```
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kinesis_2.11</artifactId>
  <version>1.7.1</version>
</dependency>
```

W
wizardforcel 已提交
20

W
init  
wizardforcel 已提交
21 22

**The `flink-connector-kinesis_2.11` has a dependency on code licensed under the [Amazon Software License](https://aws.amazon.com/asl/) (ASL). Linking to the flink-connector-kinesis will include ASL licensed code into your application.**
噼里啪啦嘣 已提交
23
在flink-connector-kinesis_2.11有协议授权的代码依赖亚马逊的软件许可(ASL)。链接到flink-connector-kinesis将在您的应用程序中包含ASL许可的代码。
W
init  
wizardforcel 已提交
24 25

The `flink-connector-kinesis_2.11` artifact is not deployed to Maven central as part of Flink releases because of the licensing issue. Therefore, you need to build the connector yourself from the source.
噼里啪啦嘣 已提交
26
flink-connector-kinesis_2.11由于许可问题,该工件未作为Flink版本的一部分部署到Maven Central。因此,您需要从源头自己构建连接器。
W
init  
wizardforcel 已提交
27 28

Download the Flink source or check it out from the git repository. Then, use the following Maven command to build the module:
噼里啪啦嘣 已提交
29
下载Flink源或从git存储库中签出。然后,使用以下Maven命令来构建模块:
W
wizardforcel 已提交
30

W
init  
wizardforcel 已提交
31 32 33 34 35 36 37 38

```
mvn clean install -Pinclude-kinesis -DskipTests
# In Maven 3.3 the shading of flink-dist doesn't work properly in one run, so we need to run mvn for flink-dist again.
cd flink-dist
mvn clean install -Pinclude-kinesis -DskipTests
```

W
wizardforcel 已提交
39

W
init  
wizardforcel 已提交
40 41

Attention For Flink versions 1.4.2 and below, the KPL client version used by default in the Kinesis connectors, KPL 0.12.5, is no longer supported by AWS Kinesis Streams (see [here](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-upgrades.html)). This means that when building the Kinesis connector, you will need to specify a higher version KPL client (above 0.12.6) in order for the Flink Kinesis Producer to work. You can do this by specifying the preferred version via the `aws.kinesis-kpl.version` property, like so:
噼里啪啦嘣 已提交
42
注意对于Flink 1.4.2及更低版本,AWS Kinesis Streams不再支持Kinesis连接器中默认使用的KPL客户端版本KPL 0.12.5(请参阅此处)。这意味着在构建Kinesis连接器时,您需要指定一个更高版本的KPL客户端(高于0.12.6),以使Flink Kinesis Producer正常工作。您可以通过该aws.kinesis-kpl.version属性指定首选版本,例如:
W
wizardforcel 已提交
43

W
init  
wizardforcel 已提交
44 45 46 47 48

```
mvn clean install -Pinclude-kinesis -Daws.kinesis-kpl.version=0.12.6 -DskipTests
```

W
wizardforcel 已提交
49

W
init  
wizardforcel 已提交
50 51

The streaming connectors are not part of the binary distribution. See how to link with them for cluster execution [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/linking.html).
噼里啪啦嘣 已提交
52
流连接器不是二进制分发的一部分。在此处查看如何与它们链接以执行集群。
W
init  
wizardforcel 已提交
53 54 55 56

## Using the Amazon Kinesis Streams Service

Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams.
噼里啪啦嘣 已提交
57
按照Amazon Kinesis Streams开发人员指南中的说明设置Kinesis流。确保创建适当的IAM策略和用户以读取/写入Kinesis流。
W
init  
wizardforcel 已提交
58 59 60 61

## Kinesis Consumer

The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis streams within the same AWS service region, and can transparently handle resharding of streams while the job is running. Each subtask of the consumer is responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will change as shards are closed and created by Kinesis.
噼里啪啦嘣 已提交
62
这FlinkKinesisConsumer是一次精确的并行流数据源,它订阅了同一AWS服务区域内的多个AWS Kinesis流,并且可以在作业运行时透明地处理流的重新分片。使用者的每个子任务负责从多个Kinesis分片中获取数据记录。当Kinesis关闭并创建分片时,每个子任务获取的分片数量将发生变化。
W
init  
wizardforcel 已提交
63 64

Before consuming data from Kinesis streams, make sure that all streams are created with the status “ACTIVE” in the AWS dashboard.
噼里啪啦嘣 已提交
65
在使用Kinesis流中的数据之前,请确保在AWS仪表板中创建的所有流的状态均为“ ACTIVE”。
W
wizardforcel 已提交
66

W
init  
wizardforcel 已提交
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81

```
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();

DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
```


W
wizardforcel 已提交
82 83


W
init  
wizardforcel 已提交
84 85 86 87 88 89 90 91 92 93 94 95 96 97

```
val consumerConfig = new Properties()
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")

val env = StreamExecutionEnvironment.getEnvironment

val kinesis = env.addSource(new FlinkKinesisConsumer[String](
    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
```

W
wizardforcel 已提交
98

W
init  
wizardforcel 已提交
99 100

The above is a simple example of using the consumer. Configuration for the consumer is supplied with a `java.util.Properties` instance, the configuration keys for which can be found in `ConsumerConfigConstants`. The example demonstrates consuming a single Kinesis stream in the AWS region “us-east-1”. The AWS credentials are supplied using the basic method in which the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting `ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, `ASSUME_ROLE`, and `AUTO`). Also, data is being consumed from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible).
噼里啪啦嘣 已提交
101
上面是使用使用者的简单示例。使用者的配置随附一个java.util.Properties实例,其实例的配置键可在中找到ConsumerConfigConstants。该示例演示了在AWS区域“ us-east-1”中使用单个Kinesis流。的AWS凭据使用其中AWS访问密钥ID和秘密访问密钥在配置直接供给的基本方法提供(其他选项设定ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER到ENV_VAR,SYS_PROP,PROFILE,ASSUME_ROLE,和AUTO)。而且,数据是从Kinesis流中的最新位置消耗的(另一个选项将设置ConsumerConfigConstants.STREAM_INITIAL_POSITION为TRIM_HORIZON,这使用户可以从最早的记录中开始读取Kinesis流)。
W
init  
wizardforcel 已提交
102 103

Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`.
噼里啪啦嘣 已提交
104
使用者的其他可选配置键可以在中找到ConsumerConfigConstants。
W
init  
wizardforcel 已提交
105 106

Note that the configured parallelism of the Flink Kinesis Consumer source can be completely independent of the total number of shards in the Kinesis streams. When the number of shards is larger than the parallelism of the consumer, then each consumer subtask can subscribe to multiple shards; otherwise if the number of shards is smaller than the parallelism of the consumer, then some consumer subtasks will simply be idle and wait until it gets assigned new shards (i.e., when the streams are resharded to increase the number of shards for higher provisioned Kinesis service throughput).
噼里啪啦嘣 已提交
107
请注意,Flink Kinesis使用者源的已配置并行度可以完全独立于Kinesis流中的碎片总数。当分片的数量大于使用者的并行性时,则每个使用者子任务可以预订多个分片;否则,如果分片的数量小于使用者的并行性,那么某些使用者子任务将简单地处于空闲状态,并等待直到为其分配了新的分片(即,当对流进行分片以增加更高配置的Kinesis服务的分片的数量时)吞吐量)。
W
init  
wizardforcel 已提交
108 109

Also note that the assignment of shards to subtasks may not be optimal when shard IDs are not consecutive (as result of dynamic re-sharding in Kinesis). For cases where skew in the assignment leads to significant imbalanced consumption, a custom implementation of `KinesisShardAssigner` can be set on the consumer.
噼里啪啦嘣 已提交
110
另请注意,当分片ID不连续时(由于Kinesis中的动态重新分片),分片对子任务的分配可能不是最佳的。如果分配中的偏斜导致使用量显着不均衡,KinesisShardAssigner则可以在使用者上设置自定义实现。
W
init  
wizardforcel 已提交
111 112 113 114

### Configuring Starting Position

The Flink Kinesis Consumer currently provides the following options to configure where to start reading Kinesis streams, simply by setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to one of the following values in the provided configuration properties (the naming of the options identically follows [the namings used by the AWS Kinesis Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)):
噼里啪啦嘣 已提交
115
Flink Kinesis Consumer当前提供以下选项来配置从何处开始读取Kinesis流,只需ConsumerConfigConstants.STREAM_INITIAL_POSITION在提供的配置属性中将值设置为以下值之一即可(这些选项的命名完全遵循AWS Kinesis Streams服务使用的命名)。 :
W
init  
wizardforcel 已提交
116

噼里啪啦嘣 已提交
117 118
*   `LATEST`: read all shards of all streams starting from the latest record. 从最新记录开始读取所有流的所有分片。
*   `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings).从可能的最早记录开始读取所有流的所有碎片(根据保留设置,Kinesis可能会修剪数据)。
W
init  
wizardforcel 已提交
119
*   `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern :
噼里啪啦嘣 已提交
120
:从指定的时间戳开始读取所有流的所有碎片。还必须通过ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP在以下日期模式之一中提供的值,在配置属性中指定时间戳记:
W
init  
wizardforcel 已提交
121
    *   a non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`).
噼里啪啦嘣 已提交
122
    一个非负的double值,表示自Unix纪元以来经过的秒数(例如1459799926.480)。
W
init  
wizardforcel 已提交
123
    *   a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`. If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern).
噼里啪啦嘣 已提交
124 125
用户定义的模式,这是一个有效的图案为SimpleDateFormat通过提供ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT。如果ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT未定义,则默认模式将是yyyy-MM-dd'T'HH:mm:ss.SSSXXX(例如,时间戳记值为,2016-04-04并且模式yyyy-MM-dd由用户指定,或者时间戳记值2016-04-04T19:58:46.480-00:00未指定模式)。
### Fault Tolerance for Exactly-Once User-Defined State Update Semantics 用户定义的状态更新语义一次的容错能力
W
init  
wizardforcel 已提交
126 127

With Flink’s checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and periodically checkpoint each shard’s progress. In case of a job failure, Flink will restore the streaming program to the state of the latest complete checkpoint and re-consume the records from Kinesis shards, starting from the progress that was stored in the checkpoint.
噼里啪啦嘣 已提交
128
启用Flink的检查点后,Flink Kinesis使用者将使用Kinesis流中分片的记录,并定期检查每个分片的进度。万一作业失败,Flink将把流式程序恢复到最新的完整检查点的状态,并从检查点中存储的进度开始重新使用Kinesis分片中的记录。
W
init  
wizardforcel 已提交
129 130

The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
噼里啪啦嘣 已提交
131
因此,绘制检查点的间隔定义了在发生故障的情况下最多可以返回多少程序。
W
init  
wizardforcel 已提交
132 133

To use fault tolerant Kinesis Consumers, checkpointing of the topology needs to be enabled at the execution environment:
噼里啪啦嘣 已提交
134
要使用容错的Kinesis使用者,需要在执行环境中启用拓扑检查点:
W
wizardforcel 已提交
135

W
init  
wizardforcel 已提交
136 137 138 139 140 141 142

```
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000); // checkpoint every 5000 msecs
```


W
wizardforcel 已提交
143 144


W
init  
wizardforcel 已提交
145 146 147 148 149 150

```
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
```

W
wizardforcel 已提交
151

W
init  
wizardforcel 已提交
152 153

Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.
噼里啪啦嘣 已提交
154
另请注意,只有在有足够的处理插槽可用于重新启动拓扑时,Flink才能重新启动拓扑。因此,如果拓扑由于丢失TaskManager而失败,则此后仍必须有足够的可用插槽。YARN上的Flink支持自动重启丢失的YARN容器。
W
init  
wizardforcel 已提交
155

噼里啪啦嘣 已提交
156
### Event Time for Consumed Records 消耗记录的事件时间
W
init  
wizardforcel 已提交
157

W
wizardforcel 已提交
158

W
init  
wizardforcel 已提交
159 160 161 162 163 164 165

```
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```


W
wizardforcel 已提交
166 167


W
init  
wizardforcel 已提交
168 169 170 171 172 173

```
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
```

W
wizardforcel 已提交
174

W
init  
wizardforcel 已提交
175 176

If streaming topologies choose to use the [event time notion](//ci.apache.org/projects/flink/flink-docs-release-1.7/apis/streaming/event_time.html) for record timestamps, an _approximate arrival timestamp_ will be used by default. This timestamp is attached to records by Kinesis once they were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be ascending).
噼里啪啦嘣 已提交
177
如果流式拓扑选择将事件时间概念用于记录时间戳,则默认情况下将使用近似到达时间戳。一旦流成功接收并存储了这些记录,Kinesis便将其附加到记录上。请注意,此时间戳通常被称为Kinesis服务器端时间戳,并且不能保证准确性或顺序正确性(即,时间戳可能并不总是递增的)。
W
init  
wizardforcel 已提交
178 179

Users can choose to override this default with a custom timestamp, as described [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/apis/streaming/event_timestamps_watermarks.html), or use one from the [predefined ones](//ci.apache.org/projects/flink/flink-docs-release-1.7/apis/streaming/event_timestamp_extractors.html). After doing so, it can be passed to the consumer in the following way:
噼里啪啦嘣 已提交
180
用户可以选择覆盖此默认与自定义的时间戳,如所描述这里,或者使用一个从预定义的人。这样做之后,可以通过以下方式将其传递给使用者:
W
wizardforcel 已提交
181

W
init  
wizardforcel 已提交
182 183 184 185 186 187 188 189

```
DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
    "kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig));
kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner());
```


W
wizardforcel 已提交
190 191


W
init  
wizardforcel 已提交
192 193 194 195 196 197 198

```
val kinesis = env.addSource(new FlinkKinesisConsumer[String](
    "kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig))
kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner)
```

W
wizardforcel 已提交
199

W
init  
wizardforcel 已提交
200

噼里啪啦嘣 已提交
201
### Threading Model 线程模型
W
init  
wizardforcel 已提交
202 203

The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption.
噼里啪啦嘣 已提交
204
Flink Kinesis Consumer使用多个线程进行分片发现和数据消耗。
W
init  
wizardforcel 已提交
205 206

For shard discovery, each parallel consumer subtask will have a single thread that constantly queries Kinesis for shard information even if the subtask initially did not have shards to read from when the consumer was started. In other words, if the consumer is run with a parallelism of 10, there will be a total of 10 threads constantly querying Kinesis regardless of the total amount of shards in the subscribed streams.
噼里啪啦嘣 已提交
207
对于分片发现,每个并行的使用者子任务将具有一个线程,该线程不断向Kinesis查询分片信息,即使子任务最初没有使用者启动时要读取的分片。换句话说,如果使用者以10的并行度运行,则无论订阅流中的分片总数如何,总共有10个线程将不断查询Kinesis。
W
init  
wizardforcel 已提交
208 209

For data consumption, a single thread will be created to consume each discovered shard. Threads will terminate when the shard it is responsible of consuming is closed as a result of stream resharding. In other words, there will always be one thread per open shard.
噼里啪啦嘣 已提交
210
对于数据消耗,将创建一个线程来消耗每个发现的分片。当由于流重新分片导致负责使用的分片关闭时,线程将终止。换句话说,每个打开的分片始终只有一个线程。
W
init  
wizardforcel 已提交
211

噼里啪啦嘣 已提交
212
### Internally Used Kinesis APIs 内部使用的Kinesis API
W
init  
wizardforcel 已提交
213 214

The Flink Kinesis Consumer uses the [AWS Java SDK](http://aws.amazon.com/sdk-for-java/) internally to call Kinesis APIs for shard discovery and data consumption. Due to Amazon’s [service limits for Kinesis Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) on the APIs, the consumer will be competing with other non-Flink consuming applications that the user may be running. Below is a list of APIs called by the consumer with description of how the consumer uses the API, as well as information on how to deal with any errors or warnings that the Flink Kinesis Consumer may have due to these service limits.
噼里啪啦嘣 已提交
215
Flink Kinesis Consumer 内部使用AWS Java SDK来调用Kinesis API,以进行分片发现和数据使用。由于Amazon 对API上的Kinesis Streams的服务限制,消费者将与用户正在运行的其他非Flink消费应用程序竞争。以下是使用者调用的API列表,其中描述了使用者如何使用API​​,以及有关如何处理由于这些服务限制而导致Flink Kinesis使用者可能出现的任何错误或警告的信息。
W
init  
wizardforcel 已提交
216 217

*   _[DescribeStream](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html)_: this is constantly called by a single thread in each parallel consumer subtask to discover any new shards as a result of stream resharding. By default, the consumer performs the shard discovery at an interval of 10 seconds, and will retry indefinitely until it gets a result from Kinesis. If this interferes with other non-Flink consuming applications, users can slow down the consumer of calling this API by setting a value for `ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS` in the supplied configuration properties. This sets the discovery interval to a different value. Note that this setting directly impacts the maximum delay of discovering a new shard and starting to consume it, as shards will not be discovered during the interval.
噼里啪啦嘣 已提交
218
每个并行使用者子任务中的单个线程经常调用此方法,以发现由于流重新分片而导致的任何新分片。默认情况下,使用者以10秒的间隔执行碎片发现,并且将无限期重试,直到获得Kinesis的结果。如果这干扰了其他非Flink消耗的应用程序,则用户可以通过ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS在提供的配置属性中设置的值来减慢调用此API的速度。这会将发现间隔设置为其他值。请注意,此设置将直接影响发现新碎片并开始使用该碎片的最大延迟,因为在间隔期间将不会发现碎片。
W
init  
wizardforcel 已提交
219
*   _[GetShardIterator](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)_: this is called only once when per shard consuming threads are started, and will retry if Kinesis complains that the transaction limit for the API has exceeded, up to a default of 3 attempts. Note that since the rate limit for this API is per shard (not per stream), the consumer itself should not exceed the limit. Usually, if this happens, users can either try to slow down any other non-Flink consuming applications of calling this API, or modify the retry behaviour of this API call in the consumer by setting keys prefixed by `ConsumerConfigConstants.SHARD_GETITERATOR_*` in the supplied configuration properties.
噼里啪啦嘣 已提交
220
仅在启动每个分片消耗线程时才调用一次,如果Kinesis抱怨API的事务限制已超过,则重试,默认尝试3次。请注意,由于此API的速率限制是每个分片(而不是每个流)的速率限制,因此使用者本身不应超过该限制。通常,如果发生这种情况,用户可以尝试减慢调用此API的任何其他不消耗Flink的应用程序,或者通过ConsumerConfigConstants.SHARD_GETITERATOR_*在提供的配置属性中设置以前缀的键来修改使用者中此API调用的重试行为。
W
init  
wizardforcel 已提交
221
*   _[GetRecords](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)_: this is constantly called by per shard consuming threads to fetch records from Kinesis. When a shard has multiple concurrent consumers (when there are any other non-Flink consuming applications running), the per shard rate limit may be exceeded. By default, on each call of this API, the consumer will retry if Kinesis complains that the data size / transaction limit for the API has exceeded, up to a default of 3 attempts. Users can either try to slow down other non-Flink consuming applications, or adjust the throughput of the consumer by setting the `ConsumerConfigConstants.SHARD_GETRECORDS_MAX` and `ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` keys in the supplied configuration properties. Setting the former adjusts the maximum number of records each consuming thread tries to fetch from shards on each call (default is 10,000), while the latter modifies the sleep interval between each fetch (default is 200). The retry behaviour of the consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`.
噼里啪啦嘣 已提交
222
每个分片消耗线程都会不断调用它来从Kinesis获取记录。当一个分片具有多个并发使用者时(当正在运行任何其他不消耗Flink的应用程序时),可能会超出每个分片速率限制。默认情况下,在每次调用此API时,如果Kinesis抱怨该API的数据大小/事务限制已超过,则消费者将重试,默认为3次尝试。用户可以尝试减慢其他不使用Flink的应用程序,或者通过设置ConsumerConfigConstants.SHARD_GETRECORDS_MAX和来调整使用方的吞吐量。ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS键在提供的配置属性中。设置前者将调整每个使用线程尝试从每个调用的分片中获取的最大记录数(默认值为10,000),而后者将修改每次获取之间的睡眠间隔(默认值为200)。使用者在调用此API时的重试行为也可以通过使用以前缀的其他键进行修改ConsumerConfigConstants.SHARD_GETRECORDS_*。
W
init  
wizardforcel 已提交
223 224 225 226

## Kinesis Producer

The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream.
噼里啪啦嘣 已提交
227
该FlinkKinesisProducer用途的Kinesis生产者库(KPL)把数据从弗林克流进入室壁运动流。
W
init  
wizardforcel 已提交
228 229

Note that the producer is not participating in Flink’s checkpointing and doesn’t provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details).
噼里啪啦嘣 已提交
230
请注意,生产者未参与Flink的检查点,也不提供一次精确的处理保证。同样,Kinesis生产商也不保证记录是为了分片而写的(更多信息请参见此处和此处)。
W
init  
wizardforcel 已提交
231 232

In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called “at-least-once” semantics.
噼里啪啦嘣 已提交
233
如果发生故障或重新分片,数据将再次写入Kinesis,从而导致重复。这种行为通常称为“至少一次”语义。
W
init  
wizardforcel 已提交
234 235

To put data into a Kinesis stream, make sure the stream is marked as “ACTIVE” in the AWS dashboard.
噼里啪啦嘣 已提交
236
要将数据放入Kinesis流中,请确保在AWS仪表板中将该流标记为“ ACTIVE”。
W
init  
wizardforcel 已提交
237 238

For the monitoring to work, the user accessing the stream needs access to the CloudWatch service.
噼里啪啦嘣 已提交
239
为了使监视正常工作,访问流的用户需要访问CloudWatch服务。
W
wizardforcel 已提交
240

W
init  
wizardforcel 已提交
241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269

```
Properties producerConfig = new Properties();
// Required configs
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
// Optional configs
producerConfig.put("AggregationMaxCount", "4294967295");
producerConfig.put("CollectionMaxCount", "1000");
producerConfig.put("RecordTtl", "30000");
producerConfig.put("RequestTimeout", "6000");
producerConfig.put("ThreadPoolSize", "15");

// Disable Aggregation if it's not supported by a consumer
// producerConfig.put("AggregationEnabled", "false");
// Switch KinesisProducer's threading model
// producerConfig.put("ThreadingModel", "PER_REQUEST");

FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("kinesis_stream_name");
kinesis.setDefaultPartition("0");

DataStream<String> simpleStringStream = ...;
simpleStringStream.addSink(kinesis);
```


W
wizardforcel 已提交
270 271


W
init  
wizardforcel 已提交
272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296

```
val producerConfig = new Properties()
// Required configs producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
// Optional KPL configs producerConfig.put("AggregationMaxCount", "4294967295")
producerConfig.put("CollectionMaxCount", "1000")
producerConfig.put("RecordTtl", "30000")
producerConfig.put("RequestTimeout", "6000")
producerConfig.put("ThreadPoolSize", "15")

// Disable Aggregation if it's not supported by a consumer
// producerConfig.put("AggregationEnabled", "false")
// Switch KinesisProducer's threading model
// producerConfig.put("ThreadingModel", "PER_REQUEST") 
val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig)
kinesis.setFailOnError(true)
kinesis.setDefaultStream("kinesis_stream_name")
kinesis.setDefaultPartition("0")

val simpleStringStream = ...
simpleStringStream.addSink(kinesis)
```

W
wizardforcel 已提交
297

W
init  
wizardforcel 已提交
298 299

The above is a simple example of using the producer. To initialize `FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` instance. Users can also pass in KPL’s configurations as optional parameters to customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL configs and explanations can be found [here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties). The example demonstrates producing a single Kinesis stream in the AWS region “us-east-1”.
噼里啪啦嘣 已提交
300
上面是使用生产者的简单示例。要初始化FlinkKinesisProducer,用户需要传递AWS_REGION,AWS_ACCESS_KEY_ID以及AWS_SECRET_ACCESS_KEY通过一个java.util.Properties实例。用户还可以将KPL的配置作为可选参数传入,以自定义KPL基础FlinkKinesisProducer。可以在此处找到KPL配置的完整列表和说明。该示例演示了在AWS区域“ us-east-1”中生成单个Kinesis流。
W
init  
wizardforcel 已提交
301 302

If users don’t specify any KPL configs and values, `FlinkKinesisProducer` will use default config values of KPL, except `RateLimit`. `RateLimit` limits the maximum allowed put rate for a shard, as a percentage of the backend limits. KPL’s default value is 150 but it makes KPL throw `RateLimitExceededException` too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` overrides KPL’s default value to 100.
噼里啪啦嘣 已提交
303
如果用户未指定任何KPL配置和值,FlinkKinesisProducer则将使用KPL的默认配置值(除外)RateLimit。RateLimit限制分片的最大允许放置速率,以后端限制的百分比表示。KPL的默认值为150,但它会使KPL抛出RateLimitExceededException过多,从而导致Flink接收器中断。因此,FlinkKinesisProducer会将KPL的默认值覆盖为100。
W
init  
wizardforcel 已提交
304 305

Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream. Otherwise, the returned stream name is used.
噼里啪啦嘣 已提交
306
除了SerializationSchema支持,它还支持KinesisSerializationSchema。在KinesisSerializationSchema允许将数据发送到多个流。这是使用KinesisSerializationSchema.getTargetStream(T element)方法完成的。返回null那里将指示生产者将元素写入默认流。否则,将使用返回的流名称。
W
init  
wizardforcel 已提交
307

噼里啪啦嘣 已提交
308
### Threading Model 线程模型
W
init  
wizardforcel 已提交
309 310

Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL from a one-thread-per-request mode to a thread-pool mode. KPL in thread-pool mode uses a queue and thread pool to execute requests to Kinesis. This limits the number of threads that KPL’s native process may create, and therefore greatly lowers CPU utilization and improves efficiency. **Thus, We highly recommend Flink users use thread-pool model.** The default thread pool size is `10`. Users can set the pool size in `java.util.Properties` instance with key `ThreadPoolSize`, as shown in the above example.
噼里啪啦嘣 已提交
311
从Flink 1.4.0开始,FlinkKinesisProducer将其默认基础KPL从“每个请求一个线程”模式切换到“线程池”模式。线程池模式下的KPL使用队列和线程池执行对Kinesis的请求。这限制了KPL本地进程可能创建的线程数,因此大大降低了CPU利用率并提高了效率。因此,我们强烈建议Flink用户使用线程池模型。默认线程池大小为10。用户可以java.util.Properties使用key实例设置池大小,ThreadPoolSize如上例所示。
W
init  
wizardforcel 已提交
312 313

Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example.
噼里啪啦嘣 已提交
314
用户仍然可以通过设置一个键-值对切换回一个线程每个请求模式ThreadingModel和PER_REQUEST在java.util.Properties,如图中上面的例子中注释的代码。
W
init  
wizardforcel 已提交
315

噼里啪啦嘣 已提交
316
### Backpressure 背压
W
init  
wizardforcel 已提交
317 318

By default, `FlinkKinesisProducer` does not backpressure. Instead, records that cannot be sent because of the rate restriction of 1 MB per second per shard are buffered in an unbounded queue and dropped when their `RecordTtl` expires.
噼里啪啦嘣 已提交
319
默认情况下,FlinkKinesisProducer不背压。而是将由于分片每秒1 MB的速率限制而无法发送的记录缓存在一个无界队列中,并在它们RecordTtl过期时丢弃。
W
init  
wizardforcel 已提交
320 321

To avoid data loss, you can enable backpressuring by restricting the size of the internal queue:
噼里啪啦嘣 已提交
322
为避免数据丢失,您可以通过限制内部队列的大小来启用反压:
W
init  
wizardforcel 已提交
323 324 325 326 327 328 329

```
// 200 Bytes per record, 1 shard
kinesis.setQueueLimit(500); 
```

The value for `queueLimit` depends on the expected record size. To choose a good value, consider that Kinesis is rate-limited to 1MB per second per shard. If less than one second’s worth of records is buffered, then the queue may not be able to operate at full capacity. With the default `RecordMaxBufferedTime` of 100ms, a queue size of 100kB per shard should be sufficient. The `queueLimit` can then be computed via
噼里啪啦嘣 已提交
330
的值queueLimit取决于预期的记录大小。要选择一个好的值,请考虑将Kinesis的速率限制为每个分片每秒1MB。如果缓冲的记录时间少于一秒,则该队列可能无法满负荷运行。如果默认RecordMaxBufferedTime值为100ms,则每个分片的队列大小为100kB就足够了。所述queueLimit然后可以通过计算
W
init  
wizardforcel 已提交
331 332 333 334 335 336

```
queue limit = (number of shards * queue size per shard) / record size 
```

E.g. for 200Bytes per record and 8 shards, a queue limit of 4000 is a good starting point. If the queue size limits throughput (below 1MB per second per shard), try increasing the queue limit slightly.
噼里啪啦嘣 已提交
337
例如,对于每条记录200Bytes和8个分片,4000个队列限制是一个很好的起点。如果队列大小限制了吞吐量(每个分片每秒低于1MB),请尝试稍微增加队列限制。
W
init  
wizardforcel 已提交
338

噼里啪啦嘣 已提交
339
## Using Non-AWS Kinesis Endpoints for Testing 使用非AWS Kinesis端点进行测试
W
init  
wizardforcel 已提交
340 341

It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as [Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.
噼里啪啦嘣 已提交
342
有时希望Flink针对非AWS的Kinesis端点(如Kinesalite)充当消费者或生产者的角色;在执行Flink应用程序的功能测试时,这特别有用。必须通过配置属性覆盖通常由Flink配置中设置的AWS区域推断的AWS终端节点。
W
init  
wizardforcel 已提交
343 344

To override the AWS endpoint, taking the producer for example, set the `AWSConfigConstants.AWS_ENDPOINT` property in the Flink configuration, in addition to the `AWSConfigConstants.AWS_REGION` required by Flink. Although the region is required, it will not be used to determine the AWS endpoint URL.
噼里啪啦嘣 已提交
345
要覆盖AWS终端节点(例如,生产者)AWSConfigConstants.AWS_ENDPOINT,除了AWSConfigConstants.AWS_REGIONFlink要求的属性外,还应在Flink配置中设置属性。尽管该区域是必需的,但不会用于确定AWS端点URL。
W
init  
wizardforcel 已提交
346 347

The following example shows how one might supply the `AWSConfigConstants.AWS_ENDPOINT` configuration property:
噼里啪啦嘣 已提交
348
以下示例显示了如何提供AWSConfigConstants.AWS_ENDPOINT配置属性:
W
wizardforcel 已提交
349

W
init  
wizardforcel 已提交
350 351 352 353 354 355 356 357 358 359

```
Properties producerConfig = new Properties();
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
```


W
wizardforcel 已提交
360 361


W
init  
wizardforcel 已提交
362 363 364 365 366 367 368 369 370

```
val producerConfig = new Properties()
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567")
```

W
wizardforcel 已提交
371

W
init  
wizardforcel 已提交
372