提交 3f3d7197 编写于 作者: S shuang.kou

[feat]增加kafka 面试题

上级 0de7a49c
......@@ -265,7 +265,7 @@ SSO(Single Sign On)即单点登录说的是用户登陆多个子系统的其中
**Kafka:**
1. **[Kafka 入门+SpringBoot整合Kafka系列](https://github.com/Snailclimb/springboot-kafka)**
2. [Kafka系统设计开篇-面试看这篇就够了](docs/system-design/data-communication/Kafka系统设计开篇-面试看这篇就够了.md)
2. **[Kafka 常见面试题总结](docs/system-design/data-communication/kafka-inverview.md)**
3. [【加餐】Kafka入门看这一篇就够了](docs/system-design/data-communication/Kafka入门看这一篇就够了.md)
#### API 网关
......
> 原文链接:https://mp.weixin.qq.com/s/zxPz_aFEMrshApZQ727h4g
## 引言
MQ(消息队列)是跨进程通信的方式之一,可理解为异步rpc,上游系统对调用结果的态度往往是重要不紧急。使用消息队列有以下好处:业务解耦、流量削峰、灵活扩展。接下来介绍消息中间件Kafka。
## Kafka是什么?
Kafka是一个分布式的消息引擎。具有以下特征
能够发布和订阅消息流(类似于消息队列)
以容错的、持久的方式存储消息流
多分区概念,提高了并行能力
## Kafka架构总览
![Kafka系统架构](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/kafka%E6%9E%B6%E6%9E%84.png)
## Topic
消息的主题、队列,每一个消息都有它的topic,Kafka通过topic对消息进行归类。Kafka中可以将Topic从物理上划分成一个或多个分区(Partition),每个分区在物理上对应一个文件夹,以”topicName_partitionIndex”的命名方式命名,该dir包含了这个分区的所有消息(.log)和索引文件(.index),这使得Kafka的吞吐率可以水平扩展。
## Partition
每个分区都是一个 顺序的、不可变的消息队列, 并且可以持续的添加;分区中的消息都被分了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。
producer在发布消息的时候,可以为每条消息指定Key,这样消息被发送到broker时,会根据分区算法把消息存储到对应的分区中(一个分区存储多个消息),如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡。
![partition_info](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/partition.jpg)
## Broker
Kafka server,用来存储消息,Kafka集群中的每一个服务器都是一个Broker,消费者将从broker拉取订阅的消息
Producer
向Kafka发送消息,生产者会根据topic分发消息。生产者也负责把消息关联到Topic上的哪一个分区。最简单的方式从分区列表中轮流选择。也可以根据某种算法依照权重选择分区。算法可由开发者定义。
## Cousumer
Consermer实例可以是独立的进程,负责订阅和消费消息。消费者用consumerGroup来标识自己。同一个消费组可以并发地消费多个分区的消息,同一个partition也可以由多个consumerGroup并发消费,但是在consumerGroup中一个partition只能由一个consumer消费
## CousumerGroup
Consumer Group:同一个Consumer Group中的Consumers,Kafka将相应Topic中的每个消息只发送给其中一个Consumer
# Kafka producer 设计原理
## 发送消息的流程
![partition_info](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/sendMsg.jpg)
**1.序列化消息&&.计算partition**
根据key和value的配置对消息进行序列化,然后计算partition:
ProducerRecord对象中如果指定了partition,就使用这个partition。否则根据key和topic的partition数目取余,如果key也没有的话就随机生成一个counter,使用这个counter来和partition数目取余。这个counter每次使用的时候递增。
**2发送到batch&&唤醒Sender 线程**
根据topic-partition获取对应的batchs(Deque<ProducerBatch>),然后将消息append到batch中.如果有batch满了则唤醒Sender 线程。队列的操作是加锁执行,所以batch内消息时有序的。后续的Sender操作当前方法异步操作。
![send_msg](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/send2Batch1.png)![send_msg2](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/send2Batch2.png)
**3.Sender把消息有序发到 broker(tp replia leader)**
**3.1 确定tp relica leader 所在的broker**
Kafka中 每台broker都保存了kafka集群的metadata信息,metadata信息里包括了每个topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客户端从任一broker都可以获取到需要的metadata信息;sender线程通过metadata信息可以知道tp leader的brokerId
producer也保存了metada信息,同时根据metadata更新策略(定期更新metadata.max.age.ms、失效检测,强制更新:检查到metadata失效以后,调用metadata.requestUpdate()强制更新
```
public class PartitionInfo {
private final String topic; private final int partition;
private final Node leader; private final Node[] replicas;
private final Node[] inSyncReplicas; private final Node[] offlineReplicas;
}
```
**3.2 幂等性发送**
为实现Producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。对于每个PID,该Producer发送消息的每个<Topic, Partition>都对应一个单调递增的Sequence Number。同样,Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其序号比Broker维护的序号)大一,则Broker会接受它,否则将其丢弃:
如果消息序号比Broker维护的序号差值比一大,说明中间有数据尚未写入,即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber
如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber
Sender发送失败后会重试,这样可以保证每个消息都被发送到broker
**4. Sender处理broker发来的produce response**
一旦broker处理完Sender的produce请求,就会发送produce response给Sender,此时producer将执行我们为send()设置的回调函数。至此producer的send执行完毕。
## 吞吐性&&延时:
buffer.memory:buffer设置大了有助于提升吞吐性,但是batch太大会增大延迟,可搭配linger_ms参数使用
linger_ms:如果batch太大,或者producer qps不高,batch添加的会很慢,我们可以强制在linger_ms时间后发送batch数据
ack:producer收到多少broker的答复才算真的发送成功
0表示producer无需等待leader的确认(吞吐最高、数据可靠性最差)
1代表需要leader确认写入它的本地log并立即确认
-1/all 代表所有的ISR都完成后确认(吞吐最低、数据可靠性最高)
## Sender线程和长连接
每初始化一个producer实例,都会初始化一个Sender实例,新增到broker的长连接。
代码角度:每初始化一次KafkaProducer,都赋一个空的client
```
public KafkaProducer(final Map<String, Object> configs) {
this(configs, null, null, null, null, null, Time.SYSTEM);
}
```
![Sender_io](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/SenderIO.jpg)
终端查看TCP连接数:
lsof -p portNum -np | grep TCP
# Consumer设计原理
## poll消息
![consumer-pool](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/consumerPoll.jpg)
- 消费者通过fetch线程拉消息(单线程)
- 消费者通过心跳线程来与broker发送心跳。超时会认为挂掉
- 每个consumer
group在broker上都有一个coordnator来管理,消费者加入和退出,以及消费消息的位移都由coordnator处理。
## 位移管理
consumer的消息位移代表了当前group对topic-partition的消费进度,consumer宕机重启后可以继续从该offset开始消费。
在kafka0.8之前,位移信息存放在zookeeper上,由于zookeeper不适合高并发的读写,新版本Kafka把位移信息当成消息,发往__consumers_offsets 这个topic所在的broker,__consumers_offsets默认有50个分区。
消息的key 是groupId+topic_partition,value 是offset.
![consumerOffsetDat](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/consumerOffsetData.jpg)![consumerOffsetView](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/consumerOffsetView.jpg)
## Kafka Group 状态
![groupState](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/groupState.jpg)
- Empty:初始状态,Group 没有任何成员,如果所有的 offsets 都过期的话就会变成 Dead
- PreparingRebalance:Group 正在准备进行 Rebalance
- AwaitingSync:Group 正在等待来 group leader 的 分配方案
- Stable:稳定的状态(Group is stable);
- Dead:Group 内已经没有成员,并且它的 Metadata 已经被移除
## 重平衡reblance
当一些原因导致consumer对partition消费不再均匀时,kafka会自动执行reblance,使得consumer对partition的消费再次平衡。
什么时候发生rebalance?:
- 组订阅topic数变更
- topic partition数变更
- consumer成员变更
- consumer 加入群组或者离开群组的时候
- consumer被检测为崩溃的时候
## reblance过程
举例1 consumer被检测为崩溃引起的reblance
比如心跳线程在timeout时间内没和broker发送心跳,此时coordnator认为该group应该进行reblance。接下来其他consumer发来fetch请求后,coordnator将回复他们进行reblance通知。当consumer成员收到请求后,只有leader会根据分配策略进行分配,然后把各自的分配结果返回给coordnator。这个时候只有consumer leader返回的是实质数据,其他返回的都为空。收到分配方法后,consumer将会把分配策略同步给各consumer
举例2 consumer加入引起的reblance
使用join协议,表示有consumer 要加入到group中
使用sync 协议,根据分配规则进行分配
![reblance-join](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/reblance-join.jpg)![reblance-sync](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/reblance-sync.jpg)
(上图图片摘自网络)
## 引申:以上reblance机制存在的问题
在大型系统中,一个topic可能对应数百个consumer实例。这些consumer陆续加入到一个空消费组将导致多次的rebalance;此外consumer 实例启动的时间不可控,很有可能超出coordinator确定的rebalance timeout(即max.poll.interval.ms),将会再次触发rebalance,而每次rebalance的代价又相当地大,因为很多状态都需要在rebalance前被持久化,而在rebalance后被重新初始化。
## 新版本改进
**通过延迟进入PreparingRebalance状态减少reblance次数**
![groupStateOfNewVersion](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/groupStateOfNewVersion.jpg)
新版本新增了group.initial.rebalance.delay.ms参数。空消费组接受到成员加入请求时,不立即转化到PreparingRebalance状态来开启reblance。当时间超过group.initial.rebalance.delay.ms后,再把group状态改为PreparingRebalance(开启reblance)。实现机制是在coordinator底层新增一个group状态:InitialReblance。假设此时有多个consumer陆续启动,那么group状态先转化为InitialReblance,待group.initial.rebalance.delay.ms时间后,再转换为PreparingRebalance(开启reblance)
# Broker设计原理
Broker 是Kafka 集群中的节点。负责处理生产者发送过来的消息,消费者消费的请求。以及集群节点的管理等。由于涉及内容较多,先简单介绍,后续专门抽出一篇文章分享
## broker zk注册
![brokersInZk](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/brokersInZk.jpg)
## broker消息存储
Kafka的消息以二进制的方式紧凑地存储,节省了很大空间
此外消息存在ByteBuffer而不是堆,这样broker进程挂掉时,数据不会丢失,同时避免了gc问题
通过零拷贝和顺序寻址,让消息存储和读取速度都非常快
处理fetch请求的时候通过zero-copy 加快速度
## broker状态数据
broker设计中,每台机器都保存了相同的状态数据。主要包括以下:
controller所在的broker ID,即保存了当前集群中controller是哪台broker
集群中所有broker的信息:比如每台broker的ID、机架信息以及配置的若干组连接信息
集群中所有节点的信息:严格来说,它和上一个有些重复,不过此项是按照broker ID和***类型进行分组的。对于超大集群来说,使用这一项缓存可以快速地定位和查找给定节点信息,而无需遍历上一项中的内容,算是一个优化吧
集群中所有分区的信息:所谓分区信息指的是分区的leader、ISR和AR信息以及当前处于offline状态的副本集合。这部分数据按照topic-partitionID进行分组,可以快速地查找到每个分区的当前状态。(注:AR表示assigned replicas,即创建topic时为该分区分配的副本集合)
## broker负载均衡
**分区数量负载**:各台broker的partition数量应该均匀
partition Replica分配算法如下:
将所有Broker(假设共n个Broker)和待分配的Partition排序
将第i个Partition分配到第(i mod n)个Broker上
将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上
**容量大小负载:**每台broker的硬盘占用大小应该均匀
在kafka1.1之前,Kafka能够保证各台broker上partition数量均匀,但由于每个partition内的消息数不同,可能存在不同硬盘之间内存占用差异大的情况。在Kafka1.1中增加了副本跨路径迁移功能kafka-reassign-partitions.sh,我们可以结合它和监控系统,实现自动化的负载均衡
# Kafka高可用
在介绍kafka高可用之前先介绍几个概念
同步复制:要求所有能工作的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率
异步复制:Follower异步的从Leader pull数据,data只要被Leader写入log认为已经commit,这种情况下如果Follower落后于Leader的比较多,如果Leader突然宕机,会丢失数据
## Isr
Kafka结合同步复制和异步复制,使用ISR(与Partition Leader保持同步的Replica列表)的方式在确保数据不丢失和吞吐率之间做了平衡。Producer只需把消息发送到Partition Leader,Leader将消息写入本地Log。Follower则从Leader pull数据。Follower在收到该消息向Leader发送ACK。一旦Leader收到了ISR中所有Replica的ACK,该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。这样如果leader挂了,只要Isr中有一个replica存活,就不会丢数据。
## Isr动态更新
Leader会跟踪ISR,如果ISR中一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预定值(replica.lag.max.messages)或者Follower超过一定时间(replica.lag.time.max.ms)未向Leader发送fetch请求。
broker Nodes In Zookeeper
/brokers/topics/[topic]/partitions/[partition]/state 保存了topic-partition的leader和Isr等信息
![partitionStateInZk](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/partitionStateInZk.jpg)
## Controller负责broker故障检查&&故障转移(fail/recover)
1. Controller在Zookeeper上注册Watch,一旦有Broker宕机,其在Zookeeper对应的znode会自动被删除,Zookeeper会触发
Controller注册的watch,Controller读取最新的Broker信息
2. Controller确定set_p,该集合包含了宕机的所有Broker上的所有Partition
3. 对set_p中的每一个Partition,选举出新的leader、Isr,并更新结果。
3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR
3.2 决定该Partition的新Leader和Isr。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)
![electLeader](https://blog-article-resource.oss-cn-beijing.aliyuncs.com/kafka/electLeader.jpg)
3.3 更新Leader、ISR、leader_epoch、controller_epoch:写入/brokers/topics/[topic]/partitions/[partition]/state
4. 直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。
## Controller挂掉
每个 broker 都会在 zookeeper 的临时节点 "/controller" 注册 watcher,当 controller 宕机时 "/controller" 会消失,触发broker的watch,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。
# 使用Kafka如何保证幂等性
不丢消息
首先kafka保证了对已提交消息的at least保证
Sender有重试机制
producer业务方在使用producer发送消息时,注册回调函数。在onError方法中重发消息
consumer 拉取到消息后,处理完毕再commit,保证commit的消息一定被处理完毕
不重复
consumer拉取到消息先保存,commit成功后删除缓存数据
# Kafka高性能
partition提升了并发
zero-copy
顺序写入
消息聚集batch
页缓存
业务方对 Kafka producer的优化
增大producer数量
ack配置
batch
------
## Kafka面试题总结
### Kafka 是什么?主要应用场景有哪些?
Kafka 是一个分布式流式处理平台。这到底是什么意思呢?
流平台具有三个关键功能:
1. **消息队列**:发布和订阅消息流,这个功能类似于消息队列,这也是 Kafka 也被归类为消息队列的原因。
2. **容错的持久方式存储记录消息流**: Kafka 会把消息持久化到磁盘,有效避免了消息丢失的风险·。
3. **流式处理平台:** 在消息发布的时候进行处理,Kafka 提供了一个完整的流式处理类库。
Kafka 主要有两大应用场景:
1. **消息队列** :建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
2. **数据处理:** 构建实时的流数据处理程序来转换或处理数据流。
### 和其他消息队列相比,Kafka的优势在哪里?
我们现在经常提到 Kafka 的时候就已经默认它是一个非常优秀的消息队列了,我们也会经常拿它给 RocketMQ、RabbitMQ 对比。我觉得 Kafka 相比其他消息队列主要的优势如下:
1. **极致的性能** :基于 Scala 和 Java 语言开发,设计中大量使用了批量处理和异步的思想,最高可以每秒处理千万级别的消息。
2. **生态系统兼容性无可匹敌** :Kafka 与周边生态系统的兼容性是最好的没有之一,尤其在大数据和流计算领域。
实际上在早期的时候 Kafka 并不是一个合格的消息队列,早期的 Kafka 在消息队列领域就像是一个衣衫褴褛的孩子一样,功能不完备并且有一些小问题比如丢失消息、不保证消息可靠性等等。当然,这也和 LinkedIn 最早开发 Kafka 用于处理海量的日志有很大关系,哈哈哈,人家本来最开始就不是为了作为消息队列滴,谁知道后面误打误撞在消息队列领域占据了一席之地。
随着后续的发展,这些短板都被 Kafka 逐步修复完善。所以,**Kafka 作为消息队列不可靠这个说法已经过时!**
### 队列模型了解吗?Kafka 的消息模型知道吗?
> 题外话:早期的 JMS 和 AMQP 属于消息服务领域权威组织所做的相关的标准,我在 [JavaGuide](https://github.com/Snailclimb/JavaGuide)的 [《消息队列其实很简单》](https://github.com/Snailclimb/JavaGuide#%E6%95%B0%E6%8D%AE%E9%80%9A%E4%BF%A1%E4%B8%AD%E9%97%B4%E4%BB%B6)这篇文章中介绍过。但是,这些标准的进化跟不上消息队列的演进速度,这些标准实际上已经属于废弃状态。所以,可能存在的情况是:不同的消息队列都有自己的一套消息模型。
#### 队列模型:早期的消息模型
![](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-11/队列模型23.png)
**使用队列(Queue)作为消息通信载体,满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。** 比如:我们生产者发送 100 条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)
**队列模型存在的问题:**
假如我们存在这样一种情况:我们需要将生产者产生的消息分发给多个消费者,并且每个消费者都能接收到完成的消息内容。
这种情况,队列模型就不好解决了。很多比较杠精的人就说:我们可以为每个消费者创建一个单独的队列,让生产者发送多份。这是一种非常愚蠢的做法,浪费资源不说,还违背了使用消息队列的目的。
#### 发布-订阅模型:Kafka 消息模型
发布-订阅模型主要是为了解决队列模型存在的问题。
![](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-11/广播模型21312.png)
发布订阅模型(Pub-Sub) 使用**主题(Topic)** 作为消息通信载体,类似于**广播模式**;发布者发布一条消息,该消息通过主题传递给所有的订阅者,**在一条消息广播之后才订阅的用户则是收不到该条消息的**
**在发布 - 订阅模型中,如果只有一个订阅者,那它和队列模型就基本是一样的了。所以说,发布 - 订阅模型在功能层面上是可以兼容队列模型的。**
**Kafka 采用的就是发布 - 订阅模型。**
> **RocketMQ 的消息模型和 Kafka 基本是完全一样的。唯一的区别是 Kafka 中没有队列这个概念,与之对应的是 Partition(分区)。**
### 什么是Producer、Consumer、Broker、Topic、Partition?
Kafka 将生产者发布的消息发送到 **Topic(主题)** 中,需要这些消息的消费者可以订阅这些 **Topic(主题)**,如下图所示:
![Kafka Topic Partition](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-11/KafkaTopicPartitioning.png)
上面这张图也为我们引出了,Kafka 比较重要的几个概念:
1. **Producer(生产者)** : 产生消息的一方。
2. **Consumer(消费者)** : 消费消息的一方。
3. **Broker(代理)** : 可以看作是一个独立的 Kafka 实例。多个 Kafka Broker 组成一个 Kafka Cluster。
同时,你一定也注意到每个 Broker 中又包含了 Topic 以及 Partition 这两个重要的概念:
- **Topic(主题)** : Producer 将消息发送到特定的主题,Consumer 通过订阅特定的 Topic(主题) 来消费消息。
- **Partition(分区)** : Partition 属于 Topic 的一部分。一个 Topic 可以有多个 Partition ,并且同一 Topic 下的 Partition 可以分布在不同的 Broker 上,这也就表明一个 Topic 可以横跨多个 Broker 。这正如我上面所画的图一样。
> 划重点:**Kafka 中的 Partition(分区) 实际上可以对应成为消息队列中的队列。这样是不是更好理解一点?**
### Kafka 的多副本机制了解吗?带来了什么好处?
还有一点我觉得比较重要的是 Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。
> 生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。当 leader 副本发生故障时会从 follower 中选举出一个 leader,但是 follower 中如果有和 leader 同步程度达不到要求的参加不了 leader 的竞选。
**Kafka 的多分区(Partition)以及多副本(Replica)机制有什么好处呢?**
1. Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力(负载均衡)。
2. Partition 可以指定对应的 Replica 数, 这也极大地提高了消息存储的安全性, 提高了容灾能力,不过也相应的增加了所需要的存储空间。
### Zookeeper 在 Kafka 中的作用知道吗?
> **要想搞懂 zookeeper 在 Kafka 中的作用 一定要自己搭建一个 Kafka 环境然后自己进 zookeeper 去看一下有哪些文件夹和 Kafka 有关,每个节点又保存了什么信息。** 一定不要光看不实践,这样学来的也终会忘记!这部分内容参考和借鉴了这篇文章:https://www.jianshu.com/p/a036405f989c 。
下图就是我的本地 Zookeeper ,它成功和我本地的 Kafka 关联上(以下文件夹结构借助 idea 插件 Zookeeper tool 实现)。
<img src="https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-11/zookeeper-kafka.jpg" style="zoom:50%;" />
ZooKeeper 主要为 Kafka 提供元数据的管理的功能。
从图中我们可以看出,Zookeeper 主要为 Kafka 做了下面这些事情:
1. **Broker 注册** :在 Zookeeper 上会有一个专门**用来进行 Broker 服务器列表记录**的节点。每个 Broker 在启动时,都会到 Zookeeper 上进行注册,即到/brokers/ids 下创建属于自己的节点。每个 Broker 就会将自己的 IP 地址和端口等信息记录到该节点中去
2. **Topic 注册** : 在 Kafka 中,同一个**Topic 的消息会被分成多个分区**并将其分布在多个 Broker 上,**这些分区信息及与 Broker 的对应关系**也都是由 Zookeeper 在维护。比如我创建了一个名字为 my-topic 的主题并且它有两个分区,对应到 zookeeper 中会创建这些文件夹:`/brokers/topics/my-topic/Partitions/0``/brokers/topics/my-topic/Partitions/1`
3. **负载均衡** :上面也说过了 Kafka 通过给特定 Topic 指定多个 Partition, 而各个 Partition 可以分布在不同的 Broker 上, 这样便能提供比较好的并发能力。 对于同一个 Topic 的不同 Partition,Kafka 会尽力将这些 Partition 分布到不同的 Broker 服务器上。当生产者产生消息后也会尽量投递到不同 Broker 的 Partition 里面。当 Consumer 消费的时候,Zookeeper 可以根据当前的 Partition 数量以及 Consumer 数量来实现动态负载均衡。
4. ......
### Kafka 如何保证消息的消费顺序?
我们在使用消息队列的过程中经常有业务场景需要严格保证消息的消费顺序,比如我们同时发了 2 个消息,这 2 个消息对应的操作分别对应的数据库操作是:更改用户会员等级、根据会员等级计算订单价格。假如这两条消息的消费顺序不一样造成的最终结果就会截然不同。
我们知道 Kafka 中 Partition(分区)是真正保存消息的地方,我们发送的消息都被放在了这里。而我们的 Partition(分区) 又存在于 Topic(主题) 这个概念中,并且我们可以给特定 Topic 指定多个 Partition。
![](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-11/KafkaTopicPartionsLayout.png)
每次添加消息到 Partition(分区) 的时候都会采用尾加法,如上图所示。Kafka 只能为我们保证 Partition(分区) 中的消息有序,而不能保证 Topic(主题) 中的 Partition(分区) 的有序。
> 消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。
所以,我们就有一种很简单的保证消息消费顺序的方法:**1 个 Topic 只对应一个 Partition**。这样当然可以解决问题,但是破坏了 Kafka 的设计初衷。
Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 Partition 的话,所有消息都会被发送到指定的 Partition。并且,同一个 key 的消息可以保证只发送到同一个 partition,这个我们可以采用表/对象的 id 来作为 key 。
总结一下,对于如何保证 Kafka 中消息消费的顺序,有了下面两种方法:
1. 1 个 Topic 只对应一个 Partition。
2. (推荐)发送消息的时候指定 key/Partition。
当然不仅仅只有上面两种方法,上面两种方法是我觉得比较好理解的,
### Kafka 如何保证消息不丢失
#### 生产者丢失消息的情况
生产者(Producer) 调用`send`方法发送消息之后,消息可能因为网络问题并没有发送过去。
所以,我们不能默认在调用`send`方法发送消息之后消息消息发送成功了。为了确定消息是发送成功,我们要判断消息发送的结果。但是要注意的是 Kafka 生产者(Producer) 使用 `send` 方法发送消息实际上是异步的操作,我们可以通过 `get()`方法获取调用结果,但是这样也让它变为了同步操作,示例代码如下:
> **详细代码见我的这篇文章:[Kafka系列第三篇!10 分钟学会如何在 Spring Boot 程序中使用 Kafka 作为消息队列?](https://mp.weixin.qq.com/s?__biz=Mzg2OTA0Njk0OA==&mid=2247486269&idx=2&sn=ec00417ad641dd8c3d145d74cafa09ce&chksm=cea244f6f9d5cde0c8eb233fcc4cf82e11acd06446719a7af55230649863a3ddd95f78d111de&token=1633957262&lang=zh_CN#rd)**
```java
SendResult<String, Object> sendResult = kafkaTemplate.send(topic, o).get();
if (sendResult.getRecordMetadata() != null) {
logger.info("生产者成功发送消息到" + sendResult.getProducerRecord().topic() + "-> " + sendRe
sult.getProducerRecord().value().toString());
}
```
但是一般不推荐这么做!可以采用为其添加回调函数的形式,示例代码如下:
````java
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, o);
future.addCallback(result -> logger.info("生产者成功发送消息到topic:{} partition:{}的消息", result.getRecordMetadata().topic(), result.getRecordMetadata().partition()),
ex -> logger.error("生产者发送消失败,原因:{}", ex.getMessage()));
````
如果消息发送失败的话,我们检查失败的原因之后重新发送即可!
**另外这里推荐为 Producer 的`retries `(重试次数)设置一个比较合理的值,一般是 3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你3次一下子就重试完了**
#### 消费者丢失消息的情况
我们知道消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示 Consumer 当前消费到的 Partition(分区)的所在的位置。Kafka 通过偏移量(offset)可以保证消息在分区内的顺序性。
![kafka offset](https://my-blog-to-use.oss-cn-beijing.aliyuncs.com/2019-11/kafka-offset.jpg)
当消费者拉取到了分区的某个消息之后,消费者会自动提交了 offset。自动提交的话会有一个问题,试想一下,当消费者刚拿到这个消息准备进行真正消费的时候,突然挂掉了,消息实际上并没有被消费,但是 offset 却被自动提交了。
**解决办法也比较粗暴,我们手动关闭闭自动提交 offset,每次在真正消费完消息之后之后再自己手动提交 offset 。** 但是,细心的朋友一定会发现,这样会带来消息被重新消费的问题。比如你刚刚消费完消息之后,还没提交 offset,结果自己挂掉了,那么这个消息理论上就会被消费两次。
#### Kafka 弄丢了消息
我们知道 Kafka 为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做 leader 的家伙,其他副本称为 follower。我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。生产者和消费者只与 leader 副本交互。你可以理解为其他副本只是 leader 副本的拷贝,它们的存在只是为了保证消息存储的安全性。
**试想一种情况:假如 leader 副本所在的 broker 突然挂掉,那么就要从 follower 副本重新选出一个 leader ,但是 leader 的数据还有一些没有被 follower 副本的同步的话,就会造成消息丢失。**
**设置 acks = all**
解决办法就是我们设置 **acks = all**。acks 是 Kafka 生产者(Producer) 很重要的一个参数。
acks 的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。当我们配置 **acks = all** 代表则所有副本都要接收到该消息之后该消息才算真正成功被发送。
**设置 replication.factor >= 3**
为了保证 leader 副本能有 follower 副本能同步消息,我们一般会为 topic 设置 **replication.factor >= 3**。这样就可以保证每个 分区(partition) 至少有 3 个副本。虽然造成了数据冗余,但是带来了数据的安全性。
**设置 min.insync.replicas > 1**
一般情况下我们还需要设置 **min.insync.replicas> 1** ,这样配置代表消息至少要被写入到 2 个副本才算是被成功发送。**min.insync.replicas** 的默认值为 1 ,在实际生产中应尽量避免默认值 1。
但是,为了保证整个 Kafka 服务的高可用性,你需要确保 **replication.factor > min.insync.replicas** 。为什么呢?设想一下加入两者相等的话,只要是有一个副本挂掉,整个分区就无法正常工作了。这明显违反高可用性!一般推荐设置成 **replication.factor = min.insync.replicas + 1**
**设置 unclean.leader.election.enable = false**
> **Kafka 0.11.0.0版本开始 unclean.leader.election.enable 参数的默认值由原来的true 改为false**
我们最开始也说了我们发送的消息会被发送到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步。多个 follower 副本之间的消息同步情况不一样,当我们配置了 **unclean.leader.election.enable = false** 的话,当 leader 副本发生故障时就不会从 follower 副本中和 leader 同步程度达不到要求的副本中选择出 leader ,这样降低了消息丢失的可能性。
### Kafka 如何保证消息不重复消费
代办...
### Reference
- Kafka 官方文档: https://kafka.apache.org/documentation/
- 极客时间—《Kafka核心技术与实战》第11节:无消息丢失配置怎么实现?
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册