提交 256c9c4d 编写于 作者: G Gordon Tai 提交者: Robert Metzger

[FLINK-4033] Polish up Kinesis connector documentation

Includes:
1. Scala examples for consumer and producer
2. Add information about AWS Kinesis service usage
3. Add Kinesis connecter to the fault tolerance guarantees table
4. Minor typo fix in Kafka documentation

This closes #2181
上级 fa42cdab
......@@ -236,7 +236,7 @@ properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
val myConsumer = new FlinkKafkaConsumer08[Stirng]("topic", new SimpleStringSchema(), properties);
val myConsumer = new FlinkKafkaConsumer08[String]("topic", new SimpleStringSchema(), properties);
myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());
stream = env
.addSource(myConsumer)
......
......@@ -52,12 +52,16 @@ mvn clean install -Pinclude-kinesis -DskipTests
Note that the streaming connectors are not part of the binary distribution.
See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
See how to link with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
#### Usage of Consumer
### Using the Amazon Kinesis Streams Service
Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams.
### Kinesis Consumer
The `FlinkKinesisConsumer` can be used to pull data from multiple Kinesis streams within the same AWS region in parallel.
It participates in Flink's distributed snapshot checkpointing and provides exactly-once processing guarantees. Note
It participates in Flink's distributed snapshot checkpointing and provides exactly-once user-defined state update guarantees. Note
that the current version can not handle resharding of Kinesis streams. When Kinesis streams are resharded, the consumer
will fail and the Flink streaming job must be resubmitted.
......@@ -78,10 +82,28 @@ kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYP
StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
DataStream<String> kinesisRecords = env.addSource(new FlinkKinesisConsumer<>(
DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
"kinesis_stream_name", new SimpleStringSchema(), kinesisConsumerConfig))
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val kinesisConsumerConfig = new Properties();
kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
kinesisConsumerConfig.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"aws_access_key_id_here");
kinesisConsumerConfig.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"aws_secret_key_here");
kinesisConsumerConfig.put(KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE, "LATEST");
val env = StreamExecutionEnvironment.getEnvironment
val kinesis = env.addSource(new FlinkKinesisConsumer[String](
"kinesis_stream_name", new SimpleStringSchema, kinesisConsumerConfig))
{% endhighlight %}
</div>
</div>
The above is a simple example of using the consumer. Configuration for the consumer is supplied with a `java.util.Properties`
......@@ -92,13 +114,15 @@ the AWS access key ID and secret key are directly supplied in the configuration
from the newest position in the Kinesis stream (the other option will be setting `KinesisConfigConstants.CONFIG_STREAM_INIT_POSITION_TYPE`
to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible).
#### Usage of Producer
Other optional configuration keys can be found in `KinesisConfigConstants`.
### Kinesis Producer
The `FlinkKinesisProducer` is used for putting data from a Flink stream onto a Kinesis stream. Note that the producer is not participating in
The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in
Flink's checkpointing and doesn't provide exactly-once processing guarantees. In case of a failure, data will be written again
to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics.
To put data onto a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard.
To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard.
For the monitoring to work, the user accessing the stream needs access to the Cloud watch service.
......@@ -113,16 +137,37 @@ kinesisProducerConfig.put(
kinesisProducerConfig.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"aws_secret_key_here");
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), kinesisProducerConfig);
FlinkKinesisProducer<String> kinesis =
new FlinkKinesisProducer<>(new SimpleStringSchema(), kinesisProducerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("test-flink");
kinesis.setDefaultStream("kinesis_stream_name");
kinesis.setDefaultPartition("0");
DataStream<String> simpleStringStream = ...;
simpleStringStream.addSink(kinesis);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val kinesisProducerConfig = new Properties();
kinesisProducerConfig.put(KinesisConfigConstants.CONFIG_AWS_REGION, "us-east-1");
kinesisProducerConfig.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_ACCESSKEYID,
"aws_access_key_id_here");
kinesisProducerConfig.put(
KinesisConfigConstants.CONFIG_AWS_CREDENTIALS_PROVIDER_BASIC_SECRETKEY,
"aws_secret_key_here");
val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, kinesisProducerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("kinesis_stream_name");
kinesis.setDefaultPartition("0");
val simpleStringStream = ...;
simpleStringStream.addSink(kinesis);
{% endhighlight %}
</div>
</div>
The above is a simple example of using the producer. Configuration for the producer with the mandatory configuration values is supplied with a `java.util.Properties`
......
......@@ -103,7 +103,7 @@ env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
### Fault Tolerance Guarantees of Data Sources and Sinks
Flink can guarantee exactly-once state updates to user-defined state only when the source participates in the
snapshotting mechanism. This is currently guaranteed for the Kafka source (and internal number generators), but
snapshotting mechanism. This is currently guaranteed for the Kafka source and AWS Kinesis Streams source (and internal number generators), but
not for other sources. The following table lists the state update guarantees of Flink coupled with the bundled sources:
<table class="table table-bordered">
......@@ -120,6 +120,11 @@ not for other sources. The following table lists the state update guarantees of
<td>exactly once</td>
<td>Use the appropriate Kafka connector for your version</td>
</tr>
<tr>
<td>AWS Kinesis Streams</td>
<td>exactly once</td>
<td>Current version does not handle stream resharding</td>
</tr>
<tr>
<td>RabbitMQ</td>
<td>at most once (v 0.10) / exactly once (v 1.0) </td>
......@@ -178,8 +183,13 @@ state updates) of Flink coupled with bundled sinks:
</tr>
<tr>
<td>Cassandra sink</td>
<td>at-least-once / exactly-once</td>
<td>exactly-once only for idempotent updates</td>
<td>at least once / exactly once</td>
<td>exactly once only for idempotent updates</td>
</tr>
<tr>
<td>AWS Kinesis Streams</td>
<td>at least once</td>
<td></td>
</tr>
<tr>
<td>File sinks</td>
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册