提交 19322401 编写于 作者: T Tzu-Li (Gordon) Tai

[FLINK-4577] [kinesis] Transparent reshard handling for FlinkKinesisConsumer

This closes #3458.
上级 a119a30d
......@@ -23,6 +23,9 @@ specific language governing permissions and limitations
under the License.
-->
* This will be replaced by the TOC
{:toc}
The Kinesis connector provides access to [Amazon AWS Kinesis Streams](http://aws.amazon.com/kinesis/streams/).
To use the connector, add the following Maven dependency to your project:
......@@ -53,14 +56,14 @@ mvn clean install -Pinclude-kinesis -DskipTests
The streaming connectors are not part of the binary distribution. See how to link with them for cluster
execution [here]({{site.baseurl}}/dev/linking.html).
### Using the Amazon Kinesis Streams Service
## Using the Amazon Kinesis Streams Service
Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
to setup Kinesis streams. Make sure to create the appropriate IAM policy and user to read / write to the Kinesis streams.
### Kinesis Consumer
## Kinesis Consumer
The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis
streams within the same AWS service region, and can handle resharding of streams. Each subtask of the consumer is
streams within the same AWS service region, and can transparently handle resharding of streams while the job is running. Each subtask of the consumer is
responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will
change as shards are closed and created by Kinesis.
......@@ -107,13 +110,16 @@ to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from
Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`.
**NOTE:** Currently, resharding can not be handled transparently (i.e., without failing and restarting jobs) if there are idle consumer
subtasks, which occur when the total number of shards is lower than the configured consumer parallelism. The job must be
configured to enable checkpointing, so that the new shards due to resharding can be correctly picked up and consumed by the
Kinesis consumer after the job is restored. This is a temporary limitation that will be resolved in future versions.
Please see [FLINK-4341](https://issues.apache.org/jira/browse/FLINK-4341) for more detail.
Note that the configured parallelism of the Flink Kinesis Consumer source
can be completely independent of the total number of shards in the Kinesis streams.
When the number of shards is larger than the parallelism of the consumer,
then each consumer subtask can subscribe to multiple shards; otherwise
if the number of shards is smaller than the parallelism of the consumer,
then some consumer subtasks will simply be idle and wait until it gets assigned
new shards (i.e., when the streams are resharded to increase the
number of shards for higher provisioned Kinesis service throughput).
#### Configuring Starting Position
### Configuring Starting Position
The Flink Kinesis Consumer currently provides the following options to configure where to start reading Kinesis streams, simply by setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to
one of the following values in the provided configuration properties (the naming of the options identically follows [the namings used by the AWS Kinesis Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)):
......@@ -127,7 +133,7 @@ properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIME
If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
(for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern).
#### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and
periodically checkpoint each shard's progress. In case of a job failure, Flink will restore the streaming program to the
......@@ -157,7 +163,7 @@ Also note that Flink can only restart the topology if enough processing slots ar
Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
Flink on YARN supports automatic restart of lost YARN containers.
#### Event Time for Consumed Records
### Event Time for Consumed Records
<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
......@@ -201,7 +207,7 @@ kinesis = kinesis.assignTimestampsAndWatermarks(new CustomTimestampAssigner)
</div>
</div>
#### Threading Model
### Threading Model
The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption.
......@@ -214,7 +220,7 @@ For data consumption, a single thread will be created to consume each discovered
shard it is responsible of consuming is closed as a result of stream resharding. In other words, there will always be
one thread per open shard.
#### Internally Used Kinesis APIs
### Internally Used Kinesis APIs
The Flink Kinesis Consumer uses the [AWS Java SDK](http://aws.amazon.com/sdk-for-java/) internally to call Kinesis APIs
for shard discovery and data consumption. Due to Amazon's [service limits for Kinesis Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
......@@ -248,7 +254,7 @@ adjusts the maximum number of records each consuming thread tries to fetch from
the latter modifies the sleep interval between each fetch (there will be no sleep by default). The retry behaviour of the
consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`.
### Kinesis Producer
## Kinesis Producer
The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in
Flink's checkpointing and doesn't provide exactly-once processing guarantees.
......@@ -305,7 +311,7 @@ Otherwise, the returned stream name is used.
Other optional configuration keys for the producer can be found in `ProducerConfigConstants`.
### Using Non-AWS Kinesis Endpoints for Testing
## Using Non-AWS Kinesis Endpoints for Testing
It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as
[Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing functional testing of a Flink
......
......@@ -19,7 +19,6 @@ package org.apache.flink.streaming.connectors.kinesis.internals;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
......@@ -325,35 +324,10 @@ public class KinesisDataFetcher<T> {
ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
// FLINK-4341:
// For downstream operators that work on time (ex. window operators), we are required to emit a max value watermark
// for subtasks that won't continue to have shards to read from unless resharding happens in the future, otherwise
// the downstream watermarks would not advance, leading to unbounded accumulating state.
//
// The side-effect of this limitation is that on resharding, we must fail hard if the newly discovered shard
// is to be subscribed by a subtask that has previously emitted a max value watermark, otherwise the watermarks
// will be messed up.
//
// There are 2 cases were we need to either emit a max value watermark, or deliberately fail hard:
// (a) if this subtask has no more shards to read from unless resharding happens in the future, we emit a max
// value watermark. This case is encountered when 1) all previously read shards by this subtask were closed
// due to resharding, 2) when this subtask was initially only subscribed to closed shards while the consumer
// was told to start from TRIM_HORIZON, or 3) there was initially no shards for this subtask to read on startup.
// (b) this subtask has discovered new shards to read from due to a reshard; if this subtask has already emitted
// a max value watermark, we must deliberately fail hard to avoid messing up the watermarks. The new shards
// will be subscribed by this subtask after restore as initial shards on startup.
//
// TODO: This is a temporary workaround until a min-watermark information service is available in the JobManager
// Please see FLINK-4341 for more detail
boolean emittedMaxValueWatermark = false;
if (this.numberOfActiveShards.get() == 0) {
// FLINK-4341 workaround case (a) - please see the above for details on this case
LOG.info("Subtask {} has no initial shards to read on startup; emitting max value watermark ...",
LOG.info("Subtask {} has no active shards to read on startup; marking the subtask as temporarily idle ...",
indexOfThisConsumerSubtask);
sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
emittedMaxValueWatermark = true;
sourceContext.markAsTemporarilyIdle();
}
while (running) {
......@@ -363,41 +337,6 @@ public class KinesisDataFetcher<T> {
}
List<KinesisStreamShard> newShardsDueToResharding = discoverNewShardsToSubscribe();
// -- NOTE: Potential race condition between newShardsDueToResharding and numberOfActiveShards --
// Since numberOfActiveShards is updated by parallel shard consuming threads in updateState(), there exists
// a race condition with the currently queried newShardsDueToResharding. Therefore, numberOfActiveShards
// may not correctly reflect the discover result in the below case determination. This may lead to incorrect
// case determination on the current discovery attempt, but can still be correctly handled on future attempts.
//
// Although this can be resolved by wrapping the current shard discovery attempt with the below
// case determination within a synchronized block on the checkpoint lock for atomicity, there will be
// considerable throughput performance regression as shard discovery is a remote call to AWS. Therefore,
// since the case determination is a temporary workaround for FLINK-4341, the race condition is tolerable as
// we can still eventually handle max value watermark emitting / deliberately failing on successive
// discovery attempts.
if (newShardsDueToResharding.size() == 0 && this.numberOfActiveShards.get() == 0 && !emittedMaxValueWatermark) {
// FLINK-4341 workaround case (a) - please see the above for details on this case
LOG.info("Subtask {} has completed reading all shards; emitting max value watermark ...",
indexOfThisConsumerSubtask);
sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));
emittedMaxValueWatermark = true;
} else if (newShardsDueToResharding.size() > 0 && emittedMaxValueWatermark) {
// FLINK-4341 workaround case (b) - please see the above for details on this case
//
// Note that in the case where on resharding this subtask ceased to read all of it's previous shards
// but new shards is also to be subscribed by this subtask immediately after, emittedMaxValueWatermark
// will be false; this allows the fetcher to continue reading the new shards without failing on such cases.
// However, due to the race condition mentioned above, we might still fall into case (a) first, and
// then (b) on the next discovery attempt. Although the failure is ideally unnecessary, max value
// watermark emitting still remains to be correct.
LOG.warn("Subtask {} has discovered {} new shards to subscribe, but is failing hard to avoid messing" +
" up watermarks; the new shards will be subscribed by this subtask after restore ...",
indexOfThisConsumerSubtask, newShardsDueToResharding.size());
throw new RuntimeException("Deliberate failure to avoid messing up watermarks");
}
for (KinesisStreamShard shard : newShardsDueToResharding) {
// since there may be delay in discovering a new shard, all new shards due to
// resharding should be read starting from the earliest record possible
......@@ -605,9 +544,19 @@ public class KinesisDataFetcher<T> {
// if a shard's state is updated to be SENTINEL_SHARD_ENDING_SEQUENCE_NUM by its consumer thread,
// we've finished reading the shard and should determine it to be non-active
if (lastSequenceNumber.equals(SentinelSequenceNumber.SENTINEL_SHARD_ENDING_SEQUENCE_NUM.get())) {
this.numberOfActiveShards.decrementAndGet();
LOG.info("Subtask {} has reached the end of subscribed shard: {}",
indexOfThisConsumerSubtask, subscribedShardsState.get(shardStateIndex).getKinesisStreamShard());
// check if we need to mark the source as idle;
// note that on resharding, if registerNewSubscribedShardState was invoked for newly discovered shards
// AFTER the old shards had reached the end, the subtask's status will be automatically toggled back to
// be active immediately afterwards as soon as we collect records from the new shards
if (this.numberOfActiveShards.decrementAndGet() == 0) {
LOG.info("Subtask {} has reached the end of all currently subscribed shards; marking the subtask as temporarily idle ...",
indexOfThisConsumerSubtask);
sourceContext.markAsTemporarilyIdle();
}
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册