diff --git a/docs/cn/best_practice.md b/docs/cn/best_practice.md index 03a3132cfe637c93ca156075b16cb47696e9b876..5c669eea173d5b0e5a9eb9619e08cc61a8814e1b 100755 --- a/docs/cn/best_practice.md +++ b/docs/cn/best_practice.md @@ -1,43 +1,43 @@ # 最佳实践(best practice) - + ## 1 生产者 ### 1.1 发送消息注意事项 -####1 Tags的使用 - -一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags("TagA")。 - -####2 Keys的使用 - +#### 1 Tags的使用 + +一个应用尽可能用一个Topic,而消息子类型则可以用tags来标识。tags可以由应用自由设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤:message.setTags("TagA")。 + +#### 2 Keys的使用 + 每个消息在业务层面的唯一标识码要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic、key来查询这条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,这样可以避免潜在的哈希冲突。 - - + + ```java // 订单Id String orderId = "20034568923546"; - message.setKeys(orderId); -``` -####3 日志的打印 + message.setKeys(orderId); +``` +#### 3 日志的打印 + +​消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult里定义。以下对每个状态进行说明: -​消息发送成功或者失败要打印消息日志,务必要打印SendResult和key字段。send消息方法只要不抛异常,就代表发送成功。发送成功会有多个状态,在sendResult里定义。以下对每个状态进行说明: - - **SEND_OK** - -消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。 - + +消息发送成功。要注意的是消息发送成功也不意味着它是可靠的。要确保不会丢失任何消息,还应启用同步Master服务器或同步刷盘,即SYNC_MASTER或SYNC_FLUSH。 + - **FLUSH_DISK_TIMEOUT** - -消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。 - + +消息发送成功但是服务器刷盘超时。此时消息已经进入服务器队列(内存),只有服务器宕机,消息才会丢失。消息存储配置参数中可以设置刷盘方式和同步刷盘时间长度,如果Broker服务器设置了刷盘方式为同步刷盘,即FlushDiskType=SYNC_FLUSH(默认为异步刷盘方式),当Broker服务器未在同步刷盘时间内(默认为5s)完成刷盘,则将返回该状态——刷盘超时。 + - **FLUSH_SLAVE_TIMEOUT** - + 消息发送成功,但是服务器同步到Slave时超时。此时消息已经进入服务器队列,只有服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master即ASYNC_MASTER),并且从Broker服务器未在同步刷盘时间(默认为5秒)内完成与主服务器的同步,则将返回该状态——数据同步到Slave服务器超时。 - + - **SLAVE_NOT_AVAILABLE** - + 消息发送成功,但是此时Slave不可用。此时消息已经进入Master服务器队列,只有Master服务器宕机,消息才会丢失。如果Broker服务器的角色是同步Master,即SYNC_MASTER(默认是异步Master服务器即ASYNC_MASTER),但没有配置slave Broker服务器,则将返回该状态——无Slave服务器可用。 @@ -48,14 +48,14 @@ Producer的send方法本身支持内部重试,重试逻辑如下: - 至多重试2次(同步发送为2次,异步发送为0次)。 - 如果发送失败,则轮转到下一个Broker。这个方法的总耗时时间不超过sendMsgTimeout设置的值,默认10s。 - 如果本身向broker发送消息产生超时异常,就不会再重试。 - -以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。 + +以上策略也是在一定程度上保证了消息可以发送成功。如果业务对消息可靠性要求比较高,建议应用增加相应的重试逻辑:比如调用send同步方法发送失败时,则尝试将消息存储到db,然后由后台线程定时重试,确保消息一定到达Broker。 上述db重试方式为什么没有集成到MQ客户端内部做,而是要求应用自己去完成,主要基于以下几点考虑:首先,MQ的客户端设计为无状态模式,方便任意的水平扩展,且对机器资源的消耗仅仅是cpu、内存、网络。其次,如果MQ客户端内部集成一个KV存储模块,那么数据只有同步落盘才能较可靠,而同步落盘本身性能开销较大,所以通常会采用异步落盘,又由于应用关闭过程不受MQ运维人员控制,可能经常会发生 kill -9 这样暴力方式关闭,造成数据没有及时落盘而丢失。第三,Producer所在机器的可靠性较低,一般为虚拟机,不适合存储重要数据。综上,建议重试过程交由应用来控制。 ### 1.3选择oneway形式发送 -通常消息的发送是这样一个过程: - +通常消息的发送是这样一个过程: + - 客户端发送请求到服务器 - 服务器处理请求 - 服务器向客户端返回应答 @@ -73,18 +73,18 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相 ### 2.2 消费速度慢的处理方式 -####1 提高消费并行度 - -绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法: +#### 1 提高消费并行度 + +绝大部分消息消费行为都属于 IO 密集型,即可能是操作数据库,或者调用 RPC,这类消费行为的消费速度在于后端数据库或者外系统的吞吐量,通过增加消费并行度,可以提高总的消费吞吐量,但是并行度增加到一定程度,反而会下降。所以,应用必须要设置合理的并行度。 如下有几种修改消费并行度的方法: - 同一个 ConsumerGroup 下,通过增加 Consumer 实例数量来提高并行度(需要注意的是超过订阅队列数的 Consumer 实例无效)。可以通过加机器,或者在已有机器启动多个进程的方式。 -- 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。 +- 提高单个 Consumer 的消费并行线程,通过修改参数 consumeThreadMin、consumeThreadMax实现。 + +#### 2 批量方式消费 -####2 批量方式消费 - -某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。 +某些业务流程如果支持批量方式消费,则可以很大程度上提高消费吞吐量,例如订单扣款类应用,一次处理一个订单耗时 1 s,一次处理 10 个订单可能也只耗时 2 s,这样即可大幅度提高消费的吞吐量,通过设置 consumer的 consumeMessageBatchMaxSize 返个参数,默认是 1,即一次只消费一条消息,例如设置为 N,那么每次消费的消息数小于等于 N。 -####3 跳过非重要消息 +#### 3 跳过非重要消息 发生消息堆积时,如果消费速度一直追不上发送速度,如果业务对数据要求不高的话,可以选择丢弃不重要的消息。例如,当某个队列的消息数堆积到100000条以上,则尝试丢弃部分或全部消息,这样就可以快速追上发送消息的速度。示例代码如下: @@ -103,13 +103,13 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相 // TODO 正常消费过程 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } -``` - +``` -####4 优化每条消息消费过程 + +#### 4 优化每条消息消费过程 -举例如下,某条消息的消费过程如下: - +举例如下,某条消息的消费过程如下: + - 根据消息从 DB 查询【数据 1】 - 根据消息从 DB 查询【数据 2】 - 复杂的业务计算 @@ -118,9 +118,9 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相 这条消息的消费过程中有4次与 DB的 交互,如果按照每次 5ms 计算,那么总共耗时 20ms,假设业务计算耗时 5ms,那么总过耗时 25ms,所以如果能把 4 次 DB 交互优化为 2 次,那么总耗时就可以优化到 15ms,即总体性能提高了 40%。所以应用如果对时延敏感的话,可以把DB部署在SSD硬盘,相比于SCS磁盘,前者的RT会小很多。 -###2.3 消费打印日志 +### 2.3 消费打印日志 -如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。 +如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。 ```java @@ -132,41 +132,41 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } ``` - + 如果能打印每条消息消费耗时,那么在排查消费慢等线上问题时,会更方便。 ### 2.4 其他消费建议 -####1 关于消费者和订阅 +#### 1 关于消费者和订阅 ​第一件需要注意的事情是,不同的消费者组可以独立的消费一些 topic,并且每个消费者组都有自己的消费偏移量,请确保同一组内的每个消费者订阅信息保持一致。 -####2 关于有序消息 +#### 2 关于有序消息 消费者将锁定每个消息队列,以确保他们被逐个消费,虽然这将会导致性能下降,但是当你关心消息顺序的时候会很有用。我们不建议抛出异常,你可以返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 作为替代。 -####3 关于并发消费 +#### 3 关于并发消费 顾名思义,消费者将并发消费这些消息,建议你使用它来获得良好性能,我们不建议抛出异常,你可以返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 作为替代。 -####4 关于消费状态Consume Status +#### 4 关于消费状态Consume Status 对于并发的消费监听器,你可以返回 RECONSUME_LATER 来通知消费者现在不能消费这条消息,并且希望可以稍后重新消费它。然后,你可以继续消费其他消息。对于有序的消息监听器,因为你关心它的顺序,所以不能跳过消息,但是你可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT 告诉消费者等待片刻。 -####5 关于Blocking +#### 5 关于Blocking 不建议阻塞监听器,因为它会阻塞线程池,并最终可能会终止消费进程 -####6 关于线程数设置 +#### 6 关于线程数设置 消费者使用 ThreadPoolExecutor 在内部对消息进行消费,所以你可以通过设置 setConsumeThreadMin 或 setConsumeThreadMax 来改变它。 -####7 关于消费位点 +#### 7 关于消费位点 + +当建立一个新的消费者组时,需要决定是否需要消费已经存在于 Broker 中的历史消息CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费之后生成的任何消息。CONSUME_FROM_FIRST_OFFSET 将会消费每个存在于 Broker 中的信息。你也可以使用 CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产生的消息。 + + -当建立一个新的消费者组时,需要决定是否需要消费已经存在于 Broker 中的历史消息CONSUME_FROM_LAST_OFFSET 将会忽略历史消息,并消费之后生成的任何消息。CONSUME_FROM_FIRST_OFFSET 将会消费每个存在于 Broker 中的信息。你也可以使用 CONSUME_FROM_TIMESTAMP 来消费在指定时间戳后产生的消息。 - - - ## 3 Broker @@ -174,25 +174,25 @@ msgId一定是全局唯一标识符,但是实际使用中,可能会存在相 ### 3.1 Broker 角色 ​ Broker 角色分为 ASYNC_MASTER(异步主机)、SYNC_MASTER(同步主机)以及SLAVE(从机)。如果对消息的可靠性要求比较严格,可以采用 SYNC_MASTER加SLAVE的部署方式。如果对消息可靠性要求不高,可以采用ASYNC_MASTER加SLAVE的部署方式。如果只是测试方便,则可以选择仅ASYNC_MASTER或仅SYNC_MASTER的部署方式。 ### 3.2 FlushDiskType -​ SYNC_FLUSH(同步刷新)相比于ASYNC_FLUSH(异步处理)会损失很多性能,但是也更可靠,所以需要根据实际的业务场景做好权衡。 -​ -​ -​ -​ -​ +​ SYNC_FLUSH(同步刷新)相比于ASYNC_FLUSH(异步处理)会损失很多性能,但是也更可靠,所以需要根据实际的业务场景做好权衡。 +​ +​ +​ +​ +​ ​ ## 4 NameServer -​RocketMQ 中,Name Servers 被设计用来做简单的路由管理。其职责包括: +​RocketMQ 中,Name Servers 被设计用来做简单的路由管理。其职责包括: - Brokers 定期向每个名称服务器注册路由数据。 - 名称服务器为客户端,包括生产者,消费者和命令行客户端提供最新的路由信息。 -​ -​ - - - +​ +​ + + + ## 5 客户端配置 @@ -209,7 +209,7 @@ producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876"); consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876"); ``` -- Java启动参数中指定Name Server地址 +- Java启动参数中指定Name Server地址 ```text -Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876 @@ -218,10 +218,10 @@ consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876"); ```text export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876 -``` +``` - HTTP静态服务器寻址(默认) -客户端启动后,会定时访问一个静态HTTP服务器,地址如下:,这个URL的返回内容如下: +客户端启动后,会定时访问一个静态HTTP服务器,地址如下:,这个URL的返回内容如下: ```text 192.168.0.1:9876;192.168.0.2:9876 @@ -236,7 +236,7 @@ export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876 DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPullConsumer都继承于ClientConfig类,ClientConfig为客户端的公共配置类。客户端的配置都是get、set形式,每个参数都可以用spring来配置,也可以在代码中配置,例如namesrvAddr这个参数可以这样配置,producer.setNamesrvAddr("192.168.0.1:9876"),其他参数同理。 -####1 客户端的公共配置 +#### 1 客户端的公共配置 | 参数名 | 默认值 | 说明 | | ----------------------------- | ------- | ------------------------------------------------------------ | @@ -248,7 +248,7 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul | heartbeatBrokerInterval | 30000 | 向Broker发送心跳间隔时间,单位毫秒 | | persistConsumerOffsetInterval | 5000 | 持久化Consumer消费进度间隔时间,单位毫秒 | -####2 Producer配置 +#### 2 Producer配置 | 参数名 | 默认值 | 说明 | | -------------------------------- | ---------------- | ------------------------------------------------------------ | @@ -266,7 +266,7 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul | checkRequestHoldMax | 2000 | Broker回查Producer事务状态时,Producer本地缓冲请求队列大小 | | RPCHook | null | 该参数是在Producer创建时传入的,包含消息发送前的预处理和消息响应后的处理两个接口,用户可以在第一个接口中做一些安全控制或者其他操作。 | -####3 PushConsumer配置 +#### 3 PushConsumer配置 | 参数名 | 默认值 | 说明 | | ---------------------------- | ----------------------------- | ------------------------------------------------------------ | @@ -287,7 +287,7 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul | consumeMessageBatchMaxSize | 1 | 批量消费,一次消费多少条消息 | | pullBatchSize | 32 | 批量拉消息,一次最多拉多少条 | -####4 PullConsumer配置 +#### 4 PullConsumer配置 | 参数名 | 默认值 | 说明 | | -------------------------------- | ----------------------------- | ------------------------------------------------------------ | @@ -301,7 +301,7 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul | registerTopics | | 注册的topic集合 | | allocateMessageQueueStrategy | AllocateMessageQueueAveragely | Rebalance算法实现策略 | -####5 Message数据结构 +#### 5 Message数据结构 | 字段名 | 默认值 | 说明 | | -------------- | ------ | ------------------------------------------------------------ | @@ -313,8 +313,8 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul | DelayTimeLevel | 0 | 选填,消息延时级别,0表示不延时,大于0会延时特定的时间才会被消费 | | WaitStoreMsgOK | TRUE | 选填,表示消息是否在服务器落盘后才返回应答。 | - -​ + +​ ​ ## 6 系统配置 @@ -323,47 +323,47 @@ DefaultMQProducer、TransactionMQProducer、DefaultMQPushConsumer、DefaultMQPul ### 6.1 JVM选项 -​ 推荐使用最新发布的JDK 1.8版本。通过设置相同的Xms和Xmx值来防止JVM调整堆大小以获得更好的性能。简单的JVM配置如下所示: -​ -​``` +​ 推荐使用最新发布的JDK 1.8版本。通过设置相同的Xms和Xmx值来防止JVM调整堆大小以获得更好的性能。简单的JVM配置如下所示: +​ +​``` ​ -​-server -Xms8g -Xmx8g -Xmn4g -​ ``` -​ +​-server -Xms8g -Xmx8g -Xmn4g +​ ``` +​ ​ 如果您不关心RocketMQ Broker的启动时间,还有一种更好的选择,就是通过“预触摸”Java堆以确保在JVM初始化期间每个页面都将被分配。那些不关心启动时间的人可以启用它: ​ -XX:+AlwaysPreTouch 禁用偏置锁定可能会减少JVM暂停, ​ -XX:-UseBiasedLocking -至于垃圾回收,建议使用带JDK 1.8的G1收集器。 +至于垃圾回收,建议使用带JDK 1.8的G1收集器。 ```text -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -``` +``` ​ 这些GC选项看起来有点激进,但事实证明它在我们的生产环境中具有良好的性能。另外不要把-XX:MaxGCPauseMillis的值设置太小,否则JVM将使用一个小的年轻代来实现这个目标,这将导致非常频繁的minor GC,所以建议使用rolling GC日志文件: - + ```text -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m ``` - -如果写入GC文件会增加代理的延迟,可以考虑将GC日志文件重定向到内存文件系统: + +如果写入GC文件会增加代理的延迟,可以考虑将GC日志文件重定向到内存文件系统: ```text -Xloggc:/dev/shm/mq_gc_%p.log123 ``` ### 6.2 Linux内核参数 -​ os.sh脚本在bin文件夹中列出了许多内核参数,可以进行微小的更改然后用于生产用途。下面的参数需要注意,更多细节请参考/proc/sys/vm/*的[文档](https://www.kernel.org/doc/Documentation/sysctl/vm.txt) - +​ os.sh脚本在bin文件夹中列出了许多内核参数,可以进行微小的更改然后用于生产用途。下面的参数需要注意,更多细节请参考/proc/sys/vm/*的[文档](https://www.kernel.org/doc/Documentation/sysctl/vm.txt) + - **vm.extra_free_kbytes**,告诉VM在后台回收(kswapd)启动的阈值与直接回收(通过分配进程)的阈值之间保留额外的可用内存。RocketMQ使用此参数来避免内存分配中的长延迟。(与具体内核版本相关) -- **vm.min_free_kbytes**,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。 -- **vm.max_map_count**,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。(agressiveness --> aggressiveness) -- **vm.swappiness**,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。 -- **File descriptor limits**,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350。 +- **vm.min_free_kbytes**,如果将其设置为低于1024KB,将会巧妙的将系统破坏,并且系统在高负载下容易出现死锁。 +- **vm.max_map_count**,限制一个进程可能具有的最大内存映射区域数。RocketMQ将使用mmap加载CommitLog和ConsumeQueue,因此建议将为此参数设置较大的值。(agressiveness --> aggressiveness) +- **vm.swappiness**,定义内核交换内存页面的积极程度。较高的值会增加攻击性,较低的值会减少交换量。建议将值设置为10来避免交换延迟。 +- **File descriptor limits**,RocketMQ需要为文件(CommitLog和ConsumeQueue)和网络连接打开文件描述符。我们建议设置文件描述符的值为655350。 - [Disk scheduler](https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Performance_Tuning_Guide/ch06s04s02.html),RocketMQ建议使用I/O截止时间调度器,它试图为请求提供有保证的延迟。 []([]()) \ No newline at end of file