diff --git a/docs/cn/RocketMQ_Example.md b/docs/cn/RocketMQ_Example.md index f45221f99fd5016154501b1b0851b245c45dfa1b..539b3c234d4b3ebef9cd7785fb17c6002e01f3bd 100644 --- a/docs/cn/RocketMQ_Example.md +++ b/docs/cn/RocketMQ_Example.md @@ -1,7 +1,4 @@ -样例(sample) -============ - -基本样例 +1 基本样例 -------- 在基本样例中我们提供如下的功能场景: @@ -9,23 +6,23 @@ * 使用RocketMQ发送三种类型的消息:同步消息,异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答。 * 使用RocketMQ来消费接收到的消息。 -### 1、加入依赖: +### 1.1 加入依赖: `maven:` ``` - + org.apache.rocketmq rocketmq-client - 4.3.0 + 4.3.0 ``` `gradle` ``` compile 'org.apache.rocketmq:rocketmq-client:4.3.0' ``` -### 2、消息发送 +### 1.2 消息发送 -#### 1. Producer端发送同步消息 +#### 1、Producer端发送同步消息 这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。 ```java @@ -53,7 +50,7 @@ public class SyncProducer { } } ``` -#### 2. 发送异步消息 +#### 2、发送异步消息 异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。 @@ -94,7 +91,7 @@ public class AsyncProducer { } ``` -#### 3. 单向发送消息 +#### 3、单向发送消息 这种方式主要用在不特别关心发送结果的场景,例如日志发送。 @@ -123,7 +120,7 @@ public class OnewayProducer { } ``` -### 3、消费消息 +### 1.3 消费消息 ```java public class Consumer { @@ -154,7 +151,7 @@ public class Consumer { } ``` -顺序消息样例 +2 顺序消息样例 ---------- 消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。 @@ -163,7 +160,7 @@ public class Consumer { 下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。 -### 1、顺序消息生产 +### 2.1 顺序消息生产 ```java package org.apache.rocketmq.example.order2; @@ -315,7 +312,7 @@ public class Producer { } ``` -### 2、顺序消费消息 +### 2.2 顺序消费消息 ```java import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; @@ -383,10 +380,10 @@ public class ConsumerInOrder { } ``` -延时消息样例 +3 延时消息样例 ---------- -### 1、启动消费者等待传入订阅消息 +### 3.1 启动消费者等待传入订阅消息 ```java @@ -421,7 +418,7 @@ public class ScheduledMessageConsumer { ``` -### 2、发送延时消息 +### 3.2 发送延时消息 ```java @@ -448,14 +445,14 @@ public class ScheduledMessageProducer { } ``` -### 3、验证 +### 3.3 验证 您将会看到消息的消费比存储时间晚10秒。 -### 4、延时消息的使用场景 +### 3.4 延时消息的使用场景 1. 比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。 -### 5、延时消息的使用限制 +### 3.5 延时消息的使用限制 ```java // org/apache/rocketmq/store/config/MessageStoreConfig.java @@ -466,13 +463,12 @@ private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码`SendMessageProcessor.java` - -批量消息样例 +4 批量消息样例 ---------- 批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。 -### 1、发送批量消息 +### 4.1 发送批量消息 如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下: @@ -491,7 +487,7 @@ try { ``` -### 2、消息列表分割 +### 4.2 消息列表分割 复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下: @@ -552,7 +548,7 @@ while (splitter.hasNext()) { } ``` -过滤消息样例 +5 过滤消息样例 ---------- 在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如: @@ -579,7 +575,7 @@ consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC"); | c = true | ------------ ``` -### 1、基本语法 +### 5.1 基本语法 RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。 @@ -600,9 +596,9 @@ RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容 public void subscribe(finalString topic, final MessageSelector messageSelector) ``` -### 2、使用样例 +### 5.2 使用样例 -#### 1. 生产者样例 +#### 1、生产者样例 发送消息时,你能通过`putUserProperty`来设置消息的属性 @@ -620,7 +616,7 @@ SendResult sendResult = producer.send(msg); producer.shutdown(); ``` -#### 2. 消费者样例 +#### 2、消费者样例 用MessageSelector.bySql来使用sql筛选消息 @@ -638,7 +634,7 @@ consumer.start(); ``` -消息事务样例 +6 消息事务样例 ---------- 事务消息共有三种状态,提交状态、回滚状态、中间状态: @@ -647,14 +643,13 @@ consumer.start(); 2. TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。 3. TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。 -### 1、发送事务消息样例 +### 6.1 发送事务消息样例 -#### 1. 创建事务性生产者 +#### 1、创建事务性生产者 使用 `TransactionMQProducer`类创建生产者,并指定唯一的 `ProducerGroup`,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。 ```java - import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -697,7 +692,7 @@ public class TransactionProducer { } ``` -#### 2. 实现事务的监听接口 +#### 2、实现事务的监听接口 当发送半消息成功时,我们使用 `executeLocalTransaction` 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。`checkLocalTranscation` 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。 @@ -731,7 +726,7 @@ public class TransactionListenerImpl implements TransactionListener { ``` -### 2、事务消息使用上的限制 +### 6.2 事务消息使用上的限制 1. 事务消息不支持延时消息和批量消息。 2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 `transactionCheckMax`参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = `transactionCheckMax` ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 `AbstractTransactionCheckListener` 类来修改这个行为。 @@ -740,75 +735,75 @@ public class TransactionListenerImpl implements TransactionListener { 5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。 6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。 -Logappender样例 +7 Logappender样例 ----------------- RocketMQ日志提供log4j、log4j2和logback日志框架作为业务应用,下面是配置样例 -### 1、log4j样例 +### 7.1 log4j样例 按下面样例使用log4j属性配置 ``` -log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender -log4j.appender.mq.Tag=yourTag -log4j.appender.mq.Topic=yourLogTopic -log4j.appender.mq.ProducerGroup=yourLogGroup -log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress -log4j.appender.mq.layout=org.apache.log4j.PatternLayout +log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender +log4j.appender.mq.Tag=yourTag +log4j.appender.mq.Topic=yourLogTopic +log4j.appender.mq.ProducerGroup=yourLogGroup +log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress +log4j.appender.mq.layout=org.apache.log4j.PatternLayout log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n ``` 按下面样例使用log4j xml配置来使用异步添加日志 ``` - -      - - + +      + + ``` -### 2、log4j2样例 +### 7.2 log4j2样例 用log4j2时,配置如下,如果想要非阻塞,只需要使用异步添加引用即可 ``` - + ``` -### 3、logback样例 -``` -yourTagyourLogTopicyourLogGroupyourRocketmqNameserverAddress -      %date %p %t - %m%n - -1024802000true - -``` - -OpenMessaging样例 +### 7.3 logback样例 +``` +yourTagyourLogTopicyourLogGroupyourRocketmqNameserverAddress +      %date %p %t - %m%n + +1024802000true + +``` + +8 OpenMessaging样例 --------------- [OpenMessaging](https://www.google.com/url?q=http://openmessaging.cloud/&sa=D&ust=1546524111089000)旨在建立消息和流处理规范,以为金融、电子商务、物联网和大数据领域提供通用框架及工业级指导方案。在分布式异构环境中,设计原则是面向云、简单、灵活和独立于语言。符合这些规范将帮助企业方便的开发跨平台和操作系统的异构消息传递应用程序。提供了openmessaging-api 0.3.0-alpha的部分实现,下面的示例演示如何基于OpenMessaging访问RocketMQ。 -### 1、OMSProducer样例 +### 8.1 OMSProducer样例 下面的示例演示如何在同步、异步或单向传输中向RocketMQ代理发送消息。 @@ -867,7 +862,7 @@ public class SimpleProducer { } ``` -### 2、OMSPullConsumer +### 8.2 OMSPullConsumer 用OMS PullConsumer 来从指定的队列中拉取消息 @@ -920,7 +915,7 @@ public class SimplePullConsumer { } ``` -### 3、OMSPushConsumer +### 8.3 OMSPushConsumer 以下示范如何将 OMS PushConsumer 添加到指定的队列,并通过 MessageListener 消费这些消息。