Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
66e2358f
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
268
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
66e2358f
编写于
1月 25, 2019
作者:
W
wlliqipeng
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Modify the format of the title
上级
030f4322
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
93 addition
and
93 deletion
+93
-93
docs/cn/best_practice.md
docs/cn/best_practice.md
+93
-93
未找到文件。
docs/cn/best_practice.md
浏览文件 @
66e2358f
...
@@ -5,11 +5,11 @@
...
@@ -5,11 +5,11 @@
### 1.1 发送消息注意事项
### 1.1 发送消息注意事项
####
1 Tags的使用
####
1 Tags的使用
一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags("TagA")。
一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags("TagA")。
####
2 Keys的使用
####
2 Keys的使用
每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。
...
@@ -19,7 +19,7 @@
...
@@ -19,7 +19,7 @@
String
orderId
=
"20034568923546"
;
String
orderId
=
"20034568923546"
;
message
.
setKeys
(
orderId
);
message
.
setKeys
(
orderId
);
```
```
####
3 日志的打印
####
3 日志的打印
消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult里定义。以下对每个状态进行说明:
消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult里定义。以下对每个状态进行说明:
...
@@ -73,18 +73,18 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
...
@@ -73,18 +73,18 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
### 2.2 消费速度慢的处理方式
### 2.2 消费速度慢的处理方式
####1 提高消费并行度
####
1 提高消费并行度
绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:
绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法:
-
同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。
-
同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。
-
提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。
-
提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。
####2 批量方式消费
####
2 批量方式消费
某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。
某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。
####3 跳过非重要消息
####
3 跳过非重要消息
发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:
发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下:
...
@@ -106,7 +106,7 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
...
@@ -106,7 +106,7 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
```
```
####
4 优化每条消息消费过程
####
4 优化每条消息消费过程
举例如下,某条消息的消费过程如下:
举例如下,某条消息的消费过程如下:
...
@@ -118,7 +118,7 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
...
@@ -118,7 +118,7 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCS磁盘,前者的RT会小很多。
这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCS磁盘,前者的RT会小很多。
###2.3 消费打印日志
###
2.3 消费打印日志
如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。
如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。
...
@@ -137,31 +137,31 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
...
@@ -137,31 +137,31 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相
### 2.4 其他消费建议
### 2.4 其他消费建议
####1 关于消费者和订阅
####
1 关于消费者和订阅
第一件需要注意的事情是,不同的消费者组可以独立的消费一些 topic,并且每个消费者组都有自己的消费偏移量,请确保同一组内的每个消费者订阅信息保持一致。
第一件需要注意的事情是,不同的消费者组可以独立的消费一些 topic,并且每个消费者组都有自己的消费偏移量,请确保同一组内的每个消费者订阅信息保持一致。
####2 关于有序消息
####
2 关于有序消息
消费者将锁定每个消息队列,以确保他们被逐个消费,虽然这将会导致性能下降,但是当你关心消息顺序的时候会很有用。我们不建议抛出异常,你可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作为替代。
消费者将锁定每个消息队列,以确保他们被逐个消费,虽然这将会导致性能下降,但是当你关心消息顺序的时候会很有用。我们不建议抛出异常,你可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作为替代。
####3 关于并发消费
####
3 关于并发消费
顾名思义,消费者将并发消费这些消息,建议你使用它来获得良好性能,我们不建议抛出异常,你可以返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 作为替代。
顾名思义,消费者将并发消费这些消息,建议你使用它来获得良好性能,我们不建议抛出异常,你可以返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 作为替代。
####4 关于消费状态Consume Status
####
4 关于消费状态Consume Status
对于并发的消费监听器,你可以返回 RECONSUME_LATER 来通知消费者现在不能消费这条消息,并且希望可以稍后重新消费它。然后,你可以继续消费其他消息。对于有序的消息监听器,因为你关心它的顺序,所以不能跳过消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告诉消费者等待片刻。
对于并发的消费监听器,你可以返回 RECONSUME_LATER 来通知消费者现在不能消费这条消息,并且希望可以稍后重新消费它。然后,你可以继续消费其他消息。对于有序的消息监听器,因为你关心它的顺序,所以不能跳过消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告诉消费者等待片刻。
####5 关于Blocking
####
5 关于Blocking
不建议阻塞监听器,因为它会阻塞线程池,并最终可能会终止消费进程
不建议阻塞监听器,因为它会阻塞线程池,并最终可能会终止消费进程
####6 关于线程数设置
####
6 关于线程数设置
消费者使用 ThreadPoolExecutor 在内部对消息进行消费,所以你可以通过设置 setConsumeThreadMin 或 setConsumeThreadMax 来改变它。
消费者使用 ThreadPoolExecutor 在内部对消息进行消费,所以你可以通过设置 setConsumeThreadMin 或 setConsumeThreadMax 来改变它。
####7 关于消费位点
####
7 关于消费位点
当建立一个新的消费者组时,需要决定是否需要消费已经存在于 Broker 中的历史消息CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费之后生成的任何消息。CONSUME_FROM_FIRST_OFFSET 将会消费每个存在于 Broker 中的信息。你也可以使用 CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产生的消息。
当建立一个新的消费者组时,需要决定是否需要消费已经存在于 Broker 中的历史消息CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费之后生成的任何消息。CONSUME_FROM_FIRST_OFFSET 将会消费每个存在于 Broker 中的信息。你也可以使用 CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产生的消息。
...
@@ -236,7 +236,7 @@ export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
...
@@ -236,7 +236,7 @@ export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都继承于ClientConfig类,ClientConfig为客户端的公共配置类。客户端的配置都是get、set形式,每个参数都可以用spring来配置,也可以在代码中配置,例如namesrvAddr这个参数可以这样配置,producer.setNamesrvAddr("192.168.0.1:9876"),其他参数同理。
DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都继承于ClientConfig类,ClientConfig为客户端的公共配置类。客户端的配置都是get、set形式,每个参数都可以用spring来配置,也可以在代码中配置,例如namesrvAddr这个参数可以这样配置,producer.setNamesrvAddr("192.168.0.1:9876"),其他参数同理。
####1 客户端的公共配置
####
1 客户端的公共配置
| 参数名 | 默认值 | 说明 |
| 参数名 | 默认值 | 说明 |
| ----------------------------- | ------- | ------------------------------------------------------------ |
| ----------------------------- | ------- | ------------------------------------------------------------ |
...
@@ -248,7 +248,7 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul
...
@@ -248,7 +248,7 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul
| heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 |
| heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 |
| persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 |
| persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 |
####2 Producer配置
####
2 Producer配置
| 参数名 | 默认值 | 说明 |
| 参数名 | 默认值 | 说明 |
| -------------------------------- | ---------------- | ------------------------------------------------------------ |
| -------------------------------- | ---------------- | ------------------------------------------------------------ |
...
@@ -266,7 +266,7 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul
...
@@ -266,7 +266,7 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul
| checkRequestHoldMax | 2000 | Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 |
| checkRequestHoldMax | 2000 | Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 |
| RPCHook | null | 该参数是在Producer创建时传入的,包含消息发送前的预处理和消息响应后的处理两个接口,用户可以在第一个接口中做一些安全控制或者其他操作。 |
| RPCHook | null | 该参数是在Producer创建时传入的,包含消息发送前的预处理和消息响应后的处理两个接口,用户可以在第一个接口中做一些安全控制或者其他操作。 |
####3 PushConsumer配置
####
3 PushConsumer配置
| 参数名 | 默认值 | 说明 |
| 参数名 | 默认值 | 说明 |
| ---------------------------- | ----------------------------- | ------------------------------------------------------------ |
| ---------------------------- | ----------------------------- | ------------------------------------------------------------ |
...
@@ -287,7 +287,7 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul
...
@@ -287,7 +287,7 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul
| consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 |
| consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 |
| pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 |
| pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 |
####4 PullConsumer配置
####
4 PullConsumer配置
| 参数名 | 默认值 | 说明 |
| 参数名 | 默认值 | 说明 |
| -------------------------------- | ----------------------------- | ------------------------------------------------------------ |
| -------------------------------- | ----------------------------- | ------------------------------------------------------------ |
...
@@ -301,7 +301,7 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul
...
@@ -301,7 +301,7 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul
| registerTopics | | 注册的topic集合 |
| registerTopics | | 注册的topic集合 |
| allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
| allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 |
####5 Message数据结构
####
5 Message数据结构
| 字段名 | 默认值 | 说明 |
| 字段名 | 默认值 | 说明 |
| -------------- | ------ | ------------------------------------------------------------ |
| -------------- | ------ | ------------------------------------------------------------ |
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录