提交 cc40d349 编写于 作者: D dongeforever

Polish the doc for static topic

上级 0c4ef244
...@@ -2,13 +2,12 @@ ...@@ -2,13 +2,12 @@
| 时间 | 主要内容 | 作者 | | 时间 | 主要内容 | 作者 |
| --- | --- | --- | | --- | --- | --- |
| 2021-11-01 | 初稿,包括背景、目标、SOT定义与持久化、SOT生命周期、SOT的使用、API逻辑修改、问题与风险 | dongeforever | | 2021-11-01 | 初稿,包括背景、目标、SOT定义与持久化、SOT生命周期、SOT的使用、API逻辑修改、问题与风险 | dongeforever |
| 2021-11-15 | | 2021-11-15 | 修改 LogicQueue 的定义,不要引入新字段,完全复用旧的MessageQueue; RemappingStaticTopic时,不要迁移『位点』『幂等数据』等,而是采用Double-Check-Read 的机制| dongforever |
1. 修改 LogicQueue 的定义,不要引入新字段,完全复用旧的MessageQueue
1. RemappingStaticTopic时,不要迁移『位点』『幂等数据』等,而是采用Double-Check-Read 的机制 中文文档在描述特定专业术语时,仍然使用英文。
| dongforever |
### 需求背景 ### 需求背景
LogicQueue 本质上是解决『固定队列数量』的需求。 StaticTopic/LogicQueue 本质上是解决『固定队列数量』的需求。
这个需求是不是必需的呢,如果是做应用集成,则可能不是必需的,但如果是做数据集成,则是必需的。 这个需求是不是必需的呢,如果是做应用集成,则可能不是必需的,但如果是做数据集成,则是必需的。
固定队列数量,首先可以解决『顺序性』的问题。 固定队列数量,首先可以解决『顺序性』的问题。
...@@ -18,8 +17,7 @@ LogicQueue 本质上是解决『固定队列数量』的需求。 ...@@ -18,8 +17,7 @@ LogicQueue 本质上是解决『固定队列数量』的需求。
有人可能会反驳,说计算组件清洗后,可以以批的方式写入其它存储组件。这当然是可以的,但如果是这样,MQ的价值就纯粹是一个『源头』价值,而不是『通道』价值。 有人可能会反驳,说计算组件清洗后,可以以批的方式写入其它存储组件。这当然是可以的,但如果是这样,MQ的价值就纯粹是一个『源头』价值,而不是『通道』价值。
MQ要想成为一个『数据通道』,则必需要具备可以让计算组件『回写』数据的能力,具备存储『Clean Data』的能力。 MQ要想成为一个『数据通道』,则必需要具备可以让计算组件『回写』数据的能力,具备存储『Clean Data』的能力,这样才让MQ有可能在数据集成领域站稳脚跟。
具备『回写 Clean Data』的能力,才让MQ有可能在数据集成领域站稳脚跟。
如果是 RocketMQ Streams 这种轻量化的组件,则『回写』会更频繁,更重要。 如果是 RocketMQ Streams 这种轻量化的组件,则『回写』会更频繁,更重要。
...@@ -41,21 +39,24 @@ LogicQueue的思路就是为了解决这一问题。 ...@@ -41,21 +39,24 @@ LogicQueue的思路就是为了解决这一问题。
- dynamic sharded topic, dynamic topic for short, which has queues increasing with the broker numbers. - dynamic sharded topic, dynamic topic for short, which has queues increasing with the broker numbers.
- static sharded topic, static topic for short, which has fixed queues, implemented with logic queues. - static sharded topic, static topic for short, which has fixed queues, implemented with logic queues.
『Static Topic』拥有固定的分片数量,每个分片称之为『Logic Queue』。
每个『Logic Queue』由多个『Physical Queue』进行纵向分段映射组成。
#### LogicQueue 目标 #### LogicQueue 目标
在客户端,LogicQueue 与普通 Queue 没有任何区别,使用一样的概念和对象,遵循一样的语义。 在客户端,LogicQueue 与 Physical Queue 使用体感上没有任何区别,使用一样的概念和对象,遵循一样的语义。
在服务端,针对LogicQueue去适配相关的API。 在服务端,针对 LogicQueue 去适配相关的API。
#### 队列语义 #### 队列语义
RocketMQ 目前的队列含有以下语义: RocketMQ Physical Queue 含有以下语义:
- 队列内的Offset,单调递增且连续 - 队列内的Offset,单调递增且连续
- 属于同一个 Broker 上的队列,编号单调递增且连续 - 属于同一个 Broker 上的队列,编号单调递增且连续
这次修改需要保障的语义: LogicQueue 需要保障的语义:
- 队列内的offset,单调递增 - 队列内的offset,单调递增
这次修改可以不保障的语义: LogicQueue 可以不保障的语义:
- 队列内的 offset 连续 - 队列内的 offset 连续
- 属于同一个 Broker 上的队列,编号单调递增且连续 - 属于同一个 Broker 上的队列,编号单调递增且连续
...@@ -75,7 +76,7 @@ offset不连续最直接的问题就是: ...@@ -75,7 +76,7 @@ offset不连续最直接的问题就是:
#### LogicQueue 定义 #### LogicQueue 定义
当前 MessageQueue的定义如下 当前 MessageQueue 的定义如下
``` ```
private String topic; private String topic;
private String brokerName; private String brokerName;
...@@ -100,18 +101,18 @@ private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, Strin ...@@ -100,18 +101,18 @@ private final ConcurrentMap<String/* Topic */, ConcurrentMap<MessageQueue, Strin
基本目标与定义清楚了,接下来的设计,从 Source of Truth 开始。 基本目标与定义清楚了,接下来的设计,从 Source of Truth 开始。
### SOT 定义持久化 ### SOT 定义持久化
LogicQueue 的 Source of Truth 就是 LogicQueue 到 Broker Queue 的映射关系。 LogicQueue 的 Source of Truth 就是 LogicQueue 到 Physical Queue 的映射关系。
只要这个映射关系不丢失,则整个系统的状态都是可以恢复的。 只要这个映射关系不丢失,则整个系统的状态都是可以恢复的。
反之,整个系统可能陷入混乱。 反之,整个系统可能陷入混乱。
这个SOT,命名为 TopicQueueMapping。 这个SOT,命名为 TopicQueueMapping。
#### 格式 #### Mapping Schema
``` ```
{ {
"version":"1", "version":"1",
"bname": "broker" //标记这份数据的原始存储位置,如果发送误拷贝,可以利用这个字段来进行标识 "bname": "broker02" //标记这份数据的原始存储位置,如果发送误拷贝,可以利用这个字段来进行标识
"epoch": 0, //标记修改版本,用来做一致性校验 "epoch": 0, //标记修改版本,用来做一致性校验
"totalQueues":"50", //当前Topic 总共有多少 LogicQueues "totalQueues":"50", //当前Topic 总共有多少 LogicQueues
"hostedQueues": { //当前Broker 所拥有的 LogicQueues "hostedQueues": { //当前Broker 所拥有的 LogicQueues
...@@ -142,7 +143,13 @@ LogicQueue 的 Source of Truth 就是 LogicQueue 到 Broker Queue 的映射关 ...@@ -142,7 +143,13 @@ LogicQueue 的 Source of Truth 就是 LogicQueue 到 Broker Queue 的映射关
} }
} }
``` ```
『拥有』的定义是指,映射关系的最近一个队列在当前Broker。 上述示例的含义是:
* broker02 拥有 LogicQueue 3
* LogicQueue 3 由 2 个 Physical Queue 组成
* 位点范围『0-1000』映射到 Physical Queue 『broker01-0』上面
* 位点范围『1000-』映射到 Physical Queue 『broker02-0』上面
『拥有』的定义是指,映射关系的最新队列在当前Broker。注意,在实现时,也会把次新队列存储下来作为备份。
注意以下要点: 注意以下要点:
...@@ -150,14 +157,21 @@ LogicQueue 的 Source of Truth 就是 LogicQueue 到 Broker Queue 的映射关 ...@@ -150,14 +157,21 @@ LogicQueue 的 Source of Truth 就是 LogicQueue 到 Broker Queue 的映射关
- 如果将来利用 LogicQueue 去做 Serverless 弹缩,则这个数据会加速膨胀,对这个数据的利用要谨慎 - 如果将来利用 LogicQueue 去做 Serverless 弹缩,则这个数据会加速膨胀,对这个数据的利用要谨慎
- 要写上 bname,以具备自我识别能力 - 要写上 bname,以具备自我识别能力
#### Leader Completeness #### Leader Completeness
RocketMQ 没有中心化的元数据存储,那就遵循『Leader Completeness』原则。 RocketMQ 没有中心化的元数据存储,那就遵循『Leader Completeness』原则。
对于每个逻辑队列,把所有映射关系存储在『最新队列所在的Broker上面』,最新队列,其实也是可写队列。 对于每个逻辑队列,把所有映射关系存储在『最新队列所在的Broker上面』,最新队列,其实也是可写队列。
Leader Completeness,避免了数据的切割,对于后续其它操作有极大的便利。 Leader Completeness,避免了数据的切割,对于后续其它操作有极大的便利。
#### 文件隔离 #### Global Epoch Check
对于每个Static Topic,在每个Broker都应该拥有一份『TopicQueueMapping』,每份都带有Epoch。
在创建和更新时,要对已有数据进行完备性校验,如果发现不完备,则说明上次操作失败,或者部分Broker数据丢失,应该先修复再操作。
注意:
即使当前Broker不拥有任何 LogicQueue 或者 PhysicalQueue,也应该存储一份,以做校验。
假设某个Static Topic只拥有1个Logic Queue,而对应的Broker整好宕机,则此时可以根据其它Broker的信息判断出该Topic不完备。
#### File Isolation
由于 RocketMQ 很多的运维习惯,都是直接拷贝 Topics.json 到别的机器进行部署的。 由于 RocketMQ 很多的运维习惯,都是直接拷贝 Topics.json 到别的机器进行部署的。
而 TopicQueueMapping 是 Broker 相关的,如果把 TopicQueueMapping 从一个Broker拷贝到另一个Broker,则会造成SOT冲突。 而 TopicQueueMapping 是 Broker 相关的,如果把 TopicQueueMapping 从一个Broker拷贝到另一个Broker,则会造成SOT冲突。
...@@ -281,9 +295,9 @@ UpdateStaticTopic 命令会自动计算预期的分布情况,包括但不限 ...@@ -281,9 +295,9 @@ UpdateStaticTopic 命令会自动计算预期的分布情况,包括但不限
基本操作流程: 基本操作流程:
- 1. 获取旧Leader,计算映射关系 - 1 获取旧Leader,计算映射关系
- 2. 迁移『位点』『幂等数据』等 - 2 迁移『位点』『幂等数据』等
- 3. 映射关系写入新/旧 Leader - 3 映射关系写入新/旧 Leader
其中第二步,数据量可能会很大,导致迁移周期非常长,且不是并发安全的。 其中第二步,数据量可能会很大,导致迁移周期非常长,且不是并发安全的。
但这些数据,都是覆盖型的,因此可以改造成不迁移的方式,而是在API层做兼容,也即『Double-Read-Check』机制: 但这些数据,都是覆盖型的,因此可以改造成不迁移的方式,而是在API层做兼容,也即『Double-Read-Check』机制:
...@@ -312,19 +326,17 @@ UpdateStaticTopic 命令会自动计算预期的分布情况,包括但不限 ...@@ -312,19 +326,17 @@ UpdateStaticTopic 命令会自动计算预期的分布情况,包括但不限
寻找映射关系,读最早的队列的MinOffset 寻找映射关系,读最早的队列的MinOffset
#### getMaxOffset #### getMaxOffset
无需改动,直接本机读即可。 本机读,转换成logicOffset即可。
#### getOffsetByTime #### getOffsetByTime
这个是最麻烦的,需要分段查找。 需要分段查找。
如果要优化查找速度,应该在映射关系里面,插入时间戳。 如果要优化查找速度,应该在映射关系里面,插入时间戳。
#### consumerOffsets 系列 #### consumerOffsets 系列
Offset的存储,无需转换,直接存储Logic值,读取时,直接读取。 Offset的存储,无需转换,直接存储在 LogicQueue 所对应的最新 PhysicalQueue 中。
无需修改,只需进行Review有无坑。 读取时,采取『Double-Read-Check』机制。
注意:remapping时需要迁移offsets。
#### Client #### Client
...@@ -332,7 +344,7 @@ Offset的存储,无需转换,直接存储Logic值,读取时,直接读取 ...@@ -332,7 +344,7 @@ Offset的存储,无需转换,直接存储Logic值,读取时,直接读取
- Consumer解压消息时,需要加上OffsetDelta逻辑 - Consumer解压消息时,需要加上OffsetDelta逻辑
#### SDK 兼容性分析 #### SDK 兼容性分析
如果要使用StaticTopic,则需要升级客户端 如果要使用StaticTopic,则需要升级Client、Broker、Nameserver
### 问题与风险 ### 问题与风险
#### 数据不一致问题 #### 数据不一致问题
...@@ -344,13 +356,20 @@ Offset的存储,无需转换,直接存储Logic值,读取时,直接读取 ...@@ -344,13 +356,20 @@ Offset的存储,无需转换,直接存储Logic值,读取时,直接读取
#### pullResult 位点由谁设置的问题 #### pullResult 位点由谁设置的问题
类似于Batch,由客户端设置,避免服务端解开消息。 类似于Batch,由客户端设置,避免服务端解开消息。
#### 切换时的offset决策问题 #### 切换时的offset决策问题
初期做简单一些,执行保守决策 默认保障高可用
#### 远程读的性能问题 #### 远程读的性能问题
云Kafka的实战经验来看,性能损耗几乎不计。 从实战经验来看,性能损耗几乎不计。
#### 使用习惯的改变 #### 使用习惯的改变
利用新的创建命令进行隔离。 利用新的创建命令进行隔离。
#### 消费SendBack问题
目前的实现里,消费Send Back,是直接传回Commitlog Pos,这个在LogicQueue里行不通。
需要修改API,改成传回『Logic Queue Offset』。
#### 二阶消息的兼容性 #### 二阶消息的兼容性
二阶消息需要支持远程读写操作,参考 二阶消息,也即『原始消息』存储在『系统Topic』中,需要经过一轮『Read-ReWrite』逻辑才会被用户看见的消息。
例如,定时消息,事务消息。
二阶消息需要支持远程读写操作。
一期的LogicQueue不支持『二阶消息』。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册