From 07ac4433a9693fe8efbe6370cd538e45d6bd9e27 Mon Sep 17 00:00:00 2001 From: yejunyu Date: Mon, 21 Jan 2019 16:20:51 +0800 Subject: [PATCH] 1. Optimized formatting and indentation 2. Corrected some typos 3. Introduce of the Schedule Message --- docs/cn/RocketMQ_Example.md | 128 +++++++++++++++++++----------------- 1 file changed, 66 insertions(+), 62 deletions(-) diff --git a/docs/cn/RocketMQ_Example.md b/docs/cn/RocketMQ_Example.md index a7068126..5728ed4f 100644 --- a/docs/cn/RocketMQ_Example.md +++ b/docs/cn/RocketMQ_Example.md @@ -38,7 +38,7 @@ public class SyncProducer { // 启动Producer实例 producer.start(); for (int i = 0; i < 100; i++) { - // 创建消息,并指定Topic,Tag和消息体 + // 创建消息,并指定Topic,Tag和消息体 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ @@ -46,11 +46,11 @@ public class SyncProducer { // 发送消息到一个Broker SendResult sendResult = producer.send(msg); // 通过sendResult返回消息是否成功送达 - System.out.printf("%s%n", sendResult); + System.out.printf("%s%n", sendResult); } // 如果不再发送消息,关闭Producer实例。 - producer.shutdown(); - } + producer.shutdown(); + } } ``` #### 2. 发送异步消息 @@ -89,8 +89,8 @@ public class AsyncProducer { }); } // 如果不再发送消息,关闭Producer实例。 - producer.shutdown(); - } + producer.shutdown(); + } } ``` @@ -99,7 +99,6 @@ public class AsyncProducer { 这种方式主要用在不特别关心发送结果的场景,例如日志发送。 ```java - public class OnewayProducer { public static void main(String[] args) throws Exception{ // 实例化消息生产者Producer @@ -116,50 +115,45 @@ public class OnewayProducer { ); // 发送单向消息,没有任何返回结果 producer.sendOneway(msg); - + } // 如果不再发送消息,关闭Producer实例。 - producer.shutdown(); - } + producer.shutdown(); + } +} ``` ### 3、消费消息 ```java - public class Consumer { - + public static void main(String[] args) throws InterruptedException, MQClientException { - + // 实例化消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); - + // 设置NameServer的地址 consumer.setNamesrvAddr("localhost:9876"); - + // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息 consumer.subscribe("TopicTest", "*"); // 注册回调实现类来处理从broker拉取回来的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { - @Override - public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { + public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); - // 标记该消息已经被成功消费 + // 标记该消息已经被成功消费 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - }); - - // 启动消费者实例 + } + }); + // 启动消费者实例 consumer.start(); - System.out.printf("Consumer Started.%n"); } } - ``` - 顺序消息样例 ---------- @@ -167,12 +161,11 @@ public class Consumer { 顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。 -下面用订单进行示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。 +下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。 ### 1、顺序消息生产 ```java - package org.apache.rocketmq.example.order2; import org.apache.rocketmq.client.producer.DefaultMQProducer; @@ -320,13 +313,11 @@ public class Producer { return orderList; } } - ``` ### 2、顺序消费消息 ```java - import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; @@ -390,7 +381,6 @@ public class ConsumerInOrder { System.out.println("Consumer Started."); } } - ``` 延时消息样例 @@ -462,6 +452,21 @@ public class ScheduledMessageProducer { 您将会看到消息的消费比存储时间晚10秒。 +### 4、延时消息的使用场景 +1. 比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。 + +### 5、延时消息的使用限制 + +```java +// org/apache/rocketmq/store/config/MessageStoreConfig.java + +private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; +``` +现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 +消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码`SendMessageProcessor.java` + + + 批量消息样例 ---------- @@ -518,7 +523,7 @@ public class ListSplitter implements Iterator> { //忽略,否则会阻塞分裂的进程 if (nextIndex - currIndex == 0) { //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环 - nextIndex++; + nextIndex++; } break; } @@ -527,7 +532,7 @@ public class ListSplitter implements Iterator> { } else { totalSize += tmpSize; } - + } List subList = messages.subList(currIndex, nextIndex); currIndex = nextIndex; @@ -558,41 +563,41 @@ consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC"); ``` 消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子: - -\-\-\-\-\-\-\-\-\-\-\-\- -| message  | -|-------- | a > 5 AND b = 'abc' -| a = 10 |  --------------------> Gotten -| b = 'abc'| -| c = true| -\-\-\-\-\-\-\-\-\-\-\-\- -\-\-\-\-\-\-\-\-\-\-\-\- -| message  | -|-------- | a > 5 AND b = 'abc' -| a = 1 |  --------------------> Missed -| b = 'abc'| -| c = true | -\-\-\-\-\-\-\-\-\-\-\-\- - +``` +------------ +| message | +|----------| a > 5 AND b = 'abc' +| a = 10 | --------------------> Gotten +| b = 'abc'| +| c = true | +------------ +------------ +| message | +|----------| a > 5 AND b = 'abc' +| a = 1 | --------------------> Missed +| b = 'abc'| +| c = true | +------------ +``` ### 1、基本语法 RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。 -1. 数值比较,比如:**>,>=,<,<=,BETWEEN,=;** -2. 字符比较,比如:**=,<>,IN;** -3. **IS NULL** 或者 **IS NOT NULL;** -4. 逻辑符号 **AND,OR,NOT;** +- 数值比较,比如:**>,>=,<,<=,BETWEEN,=;** +- 字符比较,比如:**=,<>,IN;** +- **IS NULL** 或者 **IS NOT NULL;** +- 逻辑符号 **AND,OR,NOT;** 常量支持类型为: -1. 数值,比如:**123,3.1415;** -2. 字符,比如:**'abc',必须用单引号包裹起来;** -3. **NULL**,特殊的常量 -4. 布尔值,**TRUE** 或 **FALSE** +- 数值,比如:**123,3.1415;** +- 字符,比如:**'abc',必须用单引号包裹起来;** +- **NULL**,特殊的常量 +- 布尔值,**TRUE** 或 **FALSE** 只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下: ``` -publicvoidsubscribe(finalString topic, final MessageSelector messageSelector) +public void subscribe(finalString topic, final MessageSelector messageSelector) ``` ### 2、使用样例 @@ -611,7 +616,7 @@ Message msg = new Message("TopicTest", // 设置一些属性 msg.putUserProperty("a", String.valueOf(i)); SendResult sendResult = producer.send(msg); - + producer.shutdown(); ``` @@ -646,7 +651,7 @@ consumer.start(); #### 1. 创建事务性生产者 -使用 `TransactionMQProducer`类创建生产者客户,并指定唯一的 `ProducerGroup`,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。 +使用 `TransactionMQProducer`类创建生产者,并指定唯一的 `ProducerGroup`,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。 ```java @@ -726,14 +731,14 @@ public class TransactionListenerImpl implements TransactionListener { ``` -### 1、事务消息使用上的限制 +### 2、事务消息使用上的限制 1. 事务消息不支持延时消息和批量消息。 2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 `transactionCheckMax`参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = `transactionCheckMax` ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 `AbstractTransactionCheckListener` 类来修改这个行为。 3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 `transactionMsgTimeout` 参数。 4. 事务性消息可能不止一次被检查或消费。 5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。 -6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询客户。 +6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。 Logappender样例 ----------------- @@ -953,5 +958,4 @@ public class SimplePushConsumer { System.out.printf("Consumer startup OK%n"); } } - ``` \ No newline at end of file -- GitLab