diff --git a/docs/1.7-SNAPSHOT/33.md b/docs/1.7-SNAPSHOT/33.md index 003b1065301d8fe8847ff7b4eaca7d29b75086dd..0b3a2817ac951d917f1c3ca3873ad6faf208747d 100644 --- a/docs/1.7-SNAPSHOT/33.md +++ b/docs/1.7-SNAPSHOT/33.md @@ -13,10 +13,10 @@ Flink提供特殊的Kafka连接器,用于从/向Kafka主题读取和写入数 | Maven依赖 | 支持自 | 消费者和供应者类名称 | Kafka版 | 笔记 | | --- | --- | --- | --- | --- | -| Flink连接器-Kafka0.8_2.11 | 1.0.0 | FlinkKafkaConsumer08 FlinkKafkaProducer08 | 0.8.4 | 在内部使用Kafka 的[SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API。Flink将抵消ZK的抵消。 | -| Flink连接器-Kafka0.9_2.11 | 1.0.0 | FlinkKafkaConsumer09 FlinkKafkaProducer09 | 0.9.x | 使用新的[Consumer API](http://kafka.apache.org/documentation.html#newconsumerapi) Kafka。 | -| Flink连接器-Kafka0.10_2.11 | 1.2.0 | FlinkKafkaConsumer010 FlinkKafkaProducer010 | 0.10.x | 此连接器支持[带有时间戳的Kafka消息,](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message)用于生成和使用。 | -| Flink连接器-Kafka0.11_2.11 | 1.4.0 | FlinkKafkaConsumer011 FlinkKafkaProducer011 | 0.11.x | 由于0.11.x Kafka不支持scala 2.10。此连接器支持[Kafka事务性消息传递](https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging),为生产者提供一次语义。 | +| flink-connector-Kafka-0.8_2.11 | 1.0.0 | FlinkKafkaConsumer08 FlinkKafkaProducer08 | 0.8.4 | 在内部使用Kafka 的[SimpleConsumer](https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) API。Flink将抵消ZK的抵消。 | +| flink-connector-kafka-0.9_2.11 | 1.0.0 | FlinkKafkaConsumer09 FlinkKafkaProducer09 | 0.9.x | 使用新的[Consumer API](http://kafka.apache.org/documentation.html#newconsumerapi) Kafka。 | +| flink-connector-kafka-0.10_2.11 | 1.2.0 | FlinkKafkaConsumer010 FlinkKafkaProducer010 | 0.10.x | 此连接器支持[带有时间戳的Kafka消息,](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message)用于生成和使用。 | +| flink-connector-kafka-0.11_2.11 | 1.4.0 | FlinkKafkaConsumer011 FlinkKafkaProducer011 | 0.11.x | 由于0.11.x Kafka不支持scala 2.10。此连接器支持[Kafka事务性消息传递](https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging),为生产者提供一次语义。 | 然后,导入maven项目中的连接器: @@ -39,6 +39,27 @@ Flink提供特殊的Kafka连接器,用于从/向Kafka主题读取和写入数 * 按照[Kafka快速入门](https://kafka.apache.org/documentation.html#quickstart)的说明下载代码并启动服务器(每次启动应用程序前都需要启动Zookeeper和Kafka服务器)。 * 如果Kafka和zookeeper服务器在远程机器上运行,那么`advertised.host.name`将在设置`config/server.properties`文件必须设置本机的IP地址。 +## Kafka 1.0.0+连接器 +从Flink 1.7开始,有一个新的通用Kafka连接器,它不跟踪特定的Kafka主要版本。相反,它在Flink发布时跟踪最新版本的Kafka。 + +如果您的Kafka代理版本是1.0.0或更高版本,则应使用此Kafka连接器。如果使用旧版本的Kafka(0.11,0.10,0.9或0.8),则应使用与代理版本对应的连接器。 + +## 兼容性 +通过Kafka客户端API和代理的兼容性保证,通用Kafka连接器与较旧和较新的Kafka代理兼容。它与经纪人版本0.11.0或更高版本兼容,具体取决于所使用的功能。有关Kafka兼容性的详细信息,请参阅[Kafka文档](https://kafka.apache.org/protocol.html#protocol_compatibility)。 + +## 用法 +要使用通用Kafka连接器,请为其添加依赖关系: + +``` + + org.apache.flink + flink-connector-kafka_2.11 + 1.7.1 + +``` +然后实例化新的source(FlinkKafkaConsumer)和sink(FlinkKafkaProducer)。除了从模块和类名中删除特定的Kafka版本之外,API向后兼容Kafka 0.11连接器。 + + ## Kafka消费者 Flink的Kafka消费者被称为`FlinkKafkaConsumer08`(或`09`Kafka 0.9.0.x等)。它提供对一个或多个Kafka主题的访问。 @@ -85,7 +106,7 @@ stream = env -### 该 `DeserializationSchema` +### `DeserializationSchema` Flink Kafka Consumer需要知道如何将Kafka中的二进制数据转换为Java / Scala对象。在 `DeserializationSchema`允许用户指定这样的一个架构。`T deserialize(byte[] message)` 为每个Kafka消息调用该方法,从Kafka传递值。 @@ -368,7 +389,7 @@ stream = env **注意**:如果水印分配器依赖于从Kafka读取的记录来推进其水印(通常是这种情况),则所有主题和分区都需要具有连续的记录流。否则,整个应用程序的水印无法前进,并且所有基于时间的 算子操作(例如时间窗口或具有计时器的函数)都无法取得进展。单个空闲Kafka分区会导致此行为。计划进行Flink改进以防止这种情况发生(参见[FLINK-5479:FlinkKafkaConsumer中的每分区水印应考虑空闲分区](https://issues.apache.org/jira/browse/FLINK-5479))。同时,可能的解决方法是将_心跳消息发送_到所有消耗的分区,从而推进空闲分区的水印。 -## Kafka制片人 +## Kafka提供者 Flink的Kafka Producer被称为`FlinkKafkaProducer011`(或`010`Kafka 0.10.0.x等)。它允许将记录流写入一个或多个Kafka主题。 @@ -419,7 +440,7 @@ stream.addSink(myProducer) * _自定义分区程序_:要将记录分配给特定分区,可以为`FlinkKafkaPartitioner`构造函数提供a的实现。将为流中的每个记录调用此分区程序,以确定应将记录发送到的目标主题的确切分区。有关详细信息,请参阅[Kafka Producer Partitioning Scheme](#kafka-producer-partitioning-scheme)。 * _高级序列化模式_:与使用者类似,生产者还允许使用调用的高级序列化模式`KeyedSerializationSchema`,该模式允许单独序列化键和值。它还允许覆盖目标主题,以便一个生产者实例可以将数据发送到多个主题。 -### Kafka制片人分区计划 +### Kafka提供者分区方案 默认情况下,如果未为Flink Kafka Producer指定自定义分区程序,则生产者将使用`FlinkFixedPartitioner`将每个Flink Kafka Producer并行子任务映射到单个Kafka分区(即,接收器子任务接收的所有记录将最终都在同一个Kafka分区)。 @@ -448,34 +469,17 @@ stream.addSink(myProducer) **注意**:Kafka目前没有交易生产者,因此Flink无法保证一次性交付Kafka主题。 -**注意:**根据您的Kafka配置,即使在Kafka确认写入后您仍然可能会遇到数据丢失。特别要记住以下Kafka设置: - -* `的ack` -* `log.flush.interval.messages` -* `log.flush.interval.ms` -* `log.flush。*` - -上述选项的默认值很容易导致数据丢失。有关更多说明,请参阅Kafka文档。 -#### Kafka0.11 +#### Kafka0.11及以上 -启用Flink的检查点后,`FlinkKafkaProducer011`可以提供准确的一次交付保证。 +启用Flink的检查点后,`FlinkKafkaProducer011`(FlinkKafkaProducer 对于Kafka> = 1.0.0版本)可以提供准确的一次交付保证。 -除了启用Flink的检查点,您还可以通过将适当的`semantic`参数传递给以下选项来选择三种不同的 算子操作模式`FlinkKafkaProducer011`: +除了启用Flink的检查点,您还可以通过将适当的`semantic`参数传递给`FlinkKafkaProducer011`(`FlinkKafkaProducer`对于Kafka> = 1.0.0版本): * `Semantic.NONE`:Flink不保证任何东西。生成的记录可能会丢失,也可能会被复制。 * `Semantic.AT_LEAST_ONCE`(默认设置):类似于`setFlushOnCheckpoint(true)`在 `FlinkKafkaProducer010`。这可以保证不会丢失任何记录(尽管它们可以重复)。 * `Semantic.EXACTLY_ONCE`:使用Kafka事务提供完全一次的语义。每当您使用事务写入Kafka时,不要忘记为消耗Kafka记录的任何应用程序设置所需的`isolation.level`(`read_committed` 或`read_uncommitted`- 后者是默认值)。 -**注意:**根据您的Kafka配置,即使在Kafka确认写入后您仍然可能会遇到数据丢失。特别要记住Kafka配置中的以下属性: - -* `的ack` -* `log.flush.interval.messages` -* `log.flush.interval.ms` -* `log.flush。*` - -上述选项的默认值很容易导致数据丢失。有关更多说明,请参阅Kafka文档。 - ##### 注意事项 `Semantic.EXACTLY_ONCE`模式依赖于在从所述检查点恢复之后提交在获取检查点之前启动的事务的能力。如果Flink应用程序崩溃和完成重启之间的时间较长,那么Kafka的事务超时将导致数据丢失(Kafka将自动中止超过超时时间的事务)。考虑到这一点,请根据预期的停机时间适当配置您的交易超时。 @@ -556,3 +560,21 @@ Flink通过Kafka连接器提供一流的支持,以对为Kerberos配置的Kafka 有关Kerberos安全性的Flink配置的更多信息,请参阅[此处](https://flink.sojb.cn/ops/config.html)。您还可以[在此处](https://flink.sojb.cn/ops/security-kerberos.html)找到有关Flink内部如何设置基于Kerberos的安全性的更多详细信息。 +## 故障排除 + +如果您在使用Flink时遇到Kafka问题,请记住Flink只包装[KafkaConsumer](https://kafka.apache.org/documentation/#consumerapi)或[KafkaProducer](https://kafka.apache.org/documentation/#producerapi),您的问题可能独立于Flink,有时可以通过升级Kafka代理,重新配置Kafka代理或重新配置`KafkaConsumer`或`KafkaProducer`在Flink中解决。下面列出了一些常见问题的例子。 + +## 数据丢失 + +**注意:**根据您的Kafka配置,即使在Kafka确认写入后您仍然可能会遇到数据丢失。特别要记住以下Kafka设置: + +* `ack` +* `log.flush.interval.messages` +* `log.flush.interval.ms` +* `log.flush.*` + +上述选项的默认值很容易导致数据丢失。有关更多说明,请参阅Kafka文档。 + +## 未知主题或分区异常 + +导致此错误的一个可能原因是新的领导者选举正在进行,例如在重新启动Kafka经纪人之后或期间。这是一个可重试的异常,因此Flink作业应该能够重启并恢复正常运行。它也可以通过更改`retries`生产者设置中的属性来规避。但是,这可能会导致重新排序消息,这反过来可以通过设置`max.in.flight.requests.per.connection`为1 来避免不需要的消息。