The Kinesis connector provides access to [Amazon AWS Kinesis Streams](http://aws.amazon.com/kinesis/streams/).
Kinesis连接器提供对Amazon AWS Kinesis Streams的访问。
To use the connector, add the following Maven dependency to your project:
要使用连接器,请将以下Maven依赖项添加到您的项目中:
```
...
...
@@ -19,11 +20,13 @@ To use the connector, add the following Maven dependency to your project:
**The `flink-connector-kinesis_2.11` has a dependency on code licensed under the [Amazon Software License](https://aws.amazon.com/asl/) (ASL). Linking to the flink-connector-kinesis will include ASL licensed code into your application.**
The `flink-connector-kinesis_2.11` artifact is not deployed to Maven central as part of Flink releases because of the licensing issue. Therefore, you need to build the connector yourself from the source.
Attention For Flink versions 1.4.2 and below, the KPL client version used by default in the Kinesis connectors, KPL 0.12.5, is no longer supported by AWS Kinesis Streams (see [here](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-upgrades.html)). This means that when building the Kinesis connector, you will need to specify a higher version KPL client (above 0.12.6) in order for the Flink Kinesis Producer to work. You can do this by specifying the preferred version via the `aws.kinesis-kpl.version` property, like so:
The streaming connectors are not part of the binary distribution. See how to link with them for cluster execution [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/linking.html).
流连接器不是二进制分发的一部分。在此处查看如何与它们链接以执行集群。
## Using the Amazon Kinesis Streams Service
Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams.
The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis streams within the same AWS service region, and can transparently handle resharding of streams while the job is running. Each subtask of the consumer is responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will change as shards are closed and created by Kinesis.
@@ -92,31 +98,40 @@ val kinesis = env.addSource(new FlinkKinesisConsumer[String](
The above is a simple example of using the consumer. Configuration for the consumer is supplied with a `java.util.Properties` instance, the configuration keys for which can be found in `ConsumerConfigConstants`. The example demonstrates consuming a single Kinesis stream in the AWS region “us-east-1”. The AWS credentials are supplied using the basic method in which the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting `ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, `ASSUME_ROLE`, and `AUTO`). Also, data is being consumed from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible).
Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`.
使用者的其他可选配置键可以在中找到ConsumerConfigConstants。
Note that the configured parallelism of the Flink Kinesis Consumer source can be completely independent of the total number of shards in the Kinesis streams. When the number of shards is larger than the parallelism of the consumer, then each consumer subtask can subscribe to multiple shards; otherwise if the number of shards is smaller than the parallelism of the consumer, then some consumer subtasks will simply be idle and wait until it gets assigned new shards (i.e., when the streams are resharded to increase the number of shards for higher provisioned Kinesis service throughput).
Also note that the assignment of shards to subtasks may not be optimal when shard IDs are not consecutive (as result of dynamic re-sharding in Kinesis). For cases where skew in the assignment leads to significant imbalanced consumption, a custom implementation of `KinesisShardAssigner` can be set on the consumer.
The Flink Kinesis Consumer currently provides the following options to configure where to start reading Kinesis streams, simply by setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to one of the following values in the provided configuration properties (the naming of the options identically follows [the namings used by the AWS Kinesis Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)):
*`LATEST`: read all shards of all streams starting from the latest record.
*`TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings).
*`LATEST`: read all shards of all streams starting from the latest record. 从最新记录开始读取所有流的所有分片。
*`TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings).从可能的最早记录开始读取所有流的所有碎片(根据保留设置,Kinesis可能会修剪数据)。
*`AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern :
* a non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`).
一个非负的double值,表示自Unix纪元以来经过的秒数(例如1459799926.480)。
* a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`. If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern).
### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
### Fault Tolerance for Exactly-Once User-Defined State Update Semantics 用户定义的状态更新语义一次的容错能力
With Flink’s checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and periodically checkpoint each shard’s progress. In case of a job failure, Flink will restore the streaming program to the state of the latest complete checkpoint and re-consume the records from Kinesis shards, starting from the progress that was stored in the checkpoint.
The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
因此,绘制检查点的间隔定义了在发生故障的情况下最多可以返回多少程序。
To use fault tolerant Kinesis Consumers, checkpointing of the topology needs to be enabled at the execution environment:
要使用容错的Kinesis使用者,需要在执行环境中启用拓扑检查点:
```
...
...
@@ -136,8 +151,9 @@ env.enableCheckpointing(5000) // checkpoint every 5000 msecs
Also note that Flink can only restart the topology if enough processing slots are available to restart the topology. Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards. Flink on YARN supports automatic restart of lost YARN containers.
If streaming topologies choose to use the [event time notion](//ci.apache.org/projects/flink/flink-docs-release-1.7/apis/streaming/event_time.html) for record timestamps, an _approximate arrival timestamp_ will be used by default. This timestamp is attached to records by Kinesis once they were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be ascending).
Users can choose to override this default with a custom timestamp, as described [here](//ci.apache.org/projects/flink/flink-docs-release-1.7/apis/streaming/event_timestamps_watermarks.html), or use one from the [predefined ones](//ci.apache.org/projects/flink/flink-docs-release-1.7/apis/streaming/event_timestamp_extractors.html). After doing so, it can be passed to the consumer in the following way:
The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption.
Flink Kinesis Consumer使用多个线程进行分片发现和数据消耗。
For shard discovery, each parallel consumer subtask will have a single thread that constantly queries Kinesis for shard information even if the subtask initially did not have shards to read from when the consumer was started. In other words, if the consumer is run with a parallelism of 10, there will be a total of 10 threads constantly querying Kinesis regardless of the total amount of shards in the subscribed streams.
For data consumption, a single thread will be created to consume each discovered shard. Threads will terminate when the shard it is responsible of consuming is closed as a result of stream resharding. In other words, there will always be one thread per open shard.
The Flink Kinesis Consumer uses the [AWS Java SDK](http://aws.amazon.com/sdk-for-java/) internally to call Kinesis APIs for shard discovery and data consumption. Due to Amazon’s [service limits for Kinesis Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) on the APIs, the consumer will be competing with other non-Flink consuming applications that the user may be running. Below is a list of APIs called by the consumer with description of how the consumer uses the API, as well as information on how to deal with any errors or warnings that the Flink Kinesis Consumer may have due to these service limits.
* _[DescribeStream](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html)_: this is constantly called by a single thread in each parallel consumer subtask to discover any new shards as a result of stream resharding. By default, the consumer performs the shard discovery at an interval of 10 seconds, and will retry indefinitely until it gets a result from Kinesis. If this interferes with other non-Flink consuming applications, users can slow down the consumer of calling this API by setting a value for `ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS` in the supplied configuration properties. This sets the discovery interval to a different value. Note that this setting directly impacts the maximum delay of discovering a new shard and starting to consume it, as shards will not be discovered during the interval.
* _[GetShardIterator](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)_: this is called only once when per shard consuming threads are started, and will retry if Kinesis complains that the transaction limit for the API has exceeded, up to a default of 3 attempts. Note that since the rate limit for this API is per shard (not per stream), the consumer itself should not exceed the limit. Usually, if this happens, users can either try to slow down any other non-Flink consuming applications of calling this API, or modify the retry behaviour of this API call in the consumer by setting keys prefixed by `ConsumerConfigConstants.SHARD_GETITERATOR_*` in the supplied configuration properties.
* _[GetRecords](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)_: this is constantly called by per shard consuming threads to fetch records from Kinesis. When a shard has multiple concurrent consumers (when there are any other non-Flink consuming applications running), the per shard rate limit may be exceeded. By default, on each call of this API, the consumer will retry if Kinesis complains that the data size / transaction limit for the API has exceeded, up to a default of 3 attempts. Users can either try to slow down other non-Flink consuming applications, or adjust the throughput of the consumer by setting the `ConsumerConfigConstants.SHARD_GETRECORDS_MAX` and `ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` keys in the supplied configuration properties. Setting the former adjusts the maximum number of records each consuming thread tries to fetch from shards on each call (default is 10,000), while the latter modifies the sleep interval between each fetch (default is 200). The retry behaviour of the consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`.
The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream.
Note that the producer is not participating in Flink’s checkpointing and doesn’t provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details).
In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called “at-least-once” semantics.
The above is a simple example of using the producer. To initialize `FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` instance. Users can also pass in KPL’s configurations as optional parameters to customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL configs and explanations can be found [here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties). The example demonstrates producing a single Kinesis stream in the AWS region “us-east-1”.
If users don’t specify any KPL configs and values, `FlinkKinesisProducer` will use default config values of KPL, except `RateLimit`. `RateLimit` limits the maximum allowed put rate for a shard, as a percentage of the backend limits. KPL’s default value is 150 but it makes KPL throw `RateLimitExceededException` too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` overrides KPL’s default value to 100.
Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream. Otherwise, the returned stream name is used.
Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL from a one-thread-per-request mode to a thread-pool mode. KPL in thread-pool mode uses a queue and thread pool to execute requests to Kinesis. This limits the number of threads that KPL’s native process may create, and therefore greatly lowers CPU utilization and improves efficiency. **Thus, We highly recommend Flink users use thread-pool model.** The default thread pool size is `10`. Users can set the pool size in `java.util.Properties` instance with key `ThreadPoolSize`, as shown in the above example.
Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example.
By default, `FlinkKinesisProducer` does not backpressure. Instead, records that cannot be sent because of the rate restriction of 1 MB per second per shard are buffered in an unbounded queue and dropped when their `RecordTtl` expires.
To avoid data loss, you can enable backpressuring by restricting the size of the internal queue:
为避免数据丢失,您可以通过限制内部队列的大小来启用反压:
```
// 200 Bytes per record, 1 shard
...
...
@@ -294,21 +327,25 @@ kinesis.setQueueLimit(500);
```
The value for `queueLimit` depends on the expected record size. To choose a good value, consider that Kinesis is rate-limited to 1MB per second per shard. If less than one second’s worth of records is buffered, then the queue may not be able to operate at full capacity. With the default `RecordMaxBufferedTime` of 100ms, a queue size of 100kB per shard should be sufficient. The `queueLimit` can then be computed via
queue limit = (number of shards * queue size per shard) / record size
```
E.g. for 200Bytes per record and 8 shards, a queue limit of 4000 is a good starting point. If the queue size limits throughput (below 1MB per second per shard), try increasing the queue limit slightly.
## Using Non-AWS Kinesis Endpoints for Testing 使用非AWS Kinesis端点进行测试
It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as [Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.
To override the AWS endpoint, taking the producer for example, set the `AWSConfigConstants.AWS_ENDPOINT` property in the Flink configuration, in addition to the `AWSConfigConstants.AWS_REGION` required by Flink. Although the region is required, it will not be used to determine the AWS endpoint URL.