Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
07ac4433
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
07ac4433
编写于
1月 21, 2019
作者:
神奇大叶子
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
1. Optimized formatting and indentation
2. Corrected some typos 3. Introduce of the Schedule Message
上级
acb98ec6
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
66 addition
and
62 deletion
+66
-62
docs/cn/RocketMQ_Example.md
docs/cn/RocketMQ_Example.md
+66
-62
未找到文件。
docs/cn/RocketMQ_Example.md
浏览文件 @
07ac4433
...
@@ -99,7 +99,6 @@ public class AsyncProducer {
...
@@ -99,7 +99,6 @@ public class AsyncProducer {
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
```
java
```
java
public
class
OnewayProducer
{
public
class
OnewayProducer
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// 实例化消息生产者Producer
// 实例化消息生产者Producer
...
@@ -121,12 +120,12 @@ public class OnewayProducer {
...
@@ -121,12 +120,12 @@ public class OnewayProducer {
// 如果不再发送消息,关闭Producer实例。
// 如果不再发送消息,关闭Producer实例。
producer
.
shutdown
();
producer
.
shutdown
();
}
}
}
```
```
### 3、消费消息
### 3、消费消息
```
java
```
java
public
class
Consumer
{
public
class
Consumer
{
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
,
MQClientException
{
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
,
MQClientException
{
...
@@ -141,7 +140,6 @@ public class Consumer {
...
@@ -141,7 +140,6 @@ public class Consumer {
consumer
.
subscribe
(
"TopicTest"
,
"*"
);
consumer
.
subscribe
(
"TopicTest"
,
"*"
);
// 注册回调实现类来处理从broker拉取回来的消息
// 注册回调实现类来处理从broker拉取回来的消息
consumer
.
registerMessageListener
(
new
MessageListenerConcurrently
()
{
consumer
.
registerMessageListener
(
new
MessageListenerConcurrently
()
{
@Override
@Override
public
ConsumeConcurrentlyStatus
consumeMessage
(
List
<
MessageExt
>
msgs
,
ConsumeConcurrentlyContext
context
)
{
public
ConsumeConcurrentlyStatus
consumeMessage
(
List
<
MessageExt
>
msgs
,
ConsumeConcurrentlyContext
context
)
{
System
.
out
.
printf
(
"%s Receive New Messages: %s %n"
,
Thread
.
currentThread
().
getName
(),
msgs
);
System
.
out
.
printf
(
"%s Receive New Messages: %s %n"
,
Thread
.
currentThread
().
getName
(),
msgs
);
...
@@ -149,17 +147,13 @@ public class Consumer {
...
@@ -149,17 +147,13 @@ public class Consumer {
return
ConsumeConcurrentlyStatus
.
CONSUME_SUCCESS
;
return
ConsumeConcurrentlyStatus
.
CONSUME_SUCCESS
;
}
}
});
});
// 启动消费者实例
// 启动消费者实例
consumer
.
start
();
consumer
.
start
();
System
.
out
.
printf
(
"Consumer Started.%n"
);
System
.
out
.
printf
(
"Consumer Started.%n"
);
}
}
}
}
```
```
顺序消息样例
顺序消息样例
----------
----------
...
@@ -167,12 +161,11 @@ public class Consumer {
...
@@ -167,12 +161,11 @@ public class Consumer {
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
下面用订单进行
分区有序的
示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
### 1、顺序消息生产
### 1、顺序消息生产
```
java
```
java
package
org.apache.rocketmq.example.order2
;
package
org.apache.rocketmq.example.order2
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
...
@@ -320,13 +313,11 @@ public class Producer {
...
@@ -320,13 +313,11 @@ public class Producer {
return
orderList
;
return
orderList
;
}
}
}
}
```
```
### 2、顺序消费消息
### 2、顺序消费消息
```
java
```
java
import
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
;
import
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext
;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext
;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
;
...
@@ -390,7 +381,6 @@ public class ConsumerInOrder {
...
@@ -390,7 +381,6 @@ public class ConsumerInOrder {
System
.
out
.
println
(
"Consumer Started."
);
System
.
out
.
println
(
"Consumer Started."
);
}
}
}
}
```
```
延时消息样例
延时消息样例
...
@@ -462,6 +452,21 @@ public class ScheduledMessageProducer {
...
@@ -462,6 +452,21 @@ public class ScheduledMessageProducer {
您将会看到消息的消费比存储时间晚10秒。
您将会看到消息的消费比存储时间晚10秒。
### 4、延时消息的使用场景
1.
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
### 5、延时消息的使用限制
```
java
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private
String
messageDelayLevel
=
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
;
```
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18
消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码
`SendMessageProcessor.java`
批量消息样例
批量消息样例
----------
----------
...
@@ -558,41 +563,41 @@ consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
...
@@ -558,41 +563,41 @@ consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
```
```
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
```
\-\-\-\-\-\-\-\-\-\-\-\-
------------
| message
|
| message
|
|--------
|
a > 5 AND b = 'abc'
|--------
--|
a > 5 AND b = 'abc'
| a = 10 |
--------------------> Gotten
| a = 10 |
--------------------> Gotten
| b = 'abc'|
| b = 'abc'|
| c = true
|
| c = true
|
\-\-\-\-\-\-\-\-\-\-\-\-
------------
\-\-\-\-\-\-\-\-\-\-\-\-
------------
| message
|
| message
|
|--------
|
a > 5 AND b = 'abc'
|--------
--|
a > 5 AND b = 'abc'
| a = 1
| --------------------> Missed
| a = 1
| --------------------> Missed
| b = 'abc'|
| b = 'abc'|
| c = true
|
| c = true
|
\-\-\-\-\-\-\-\-\-\-\-\
-
-----------
-
```
### 1、基本语法
### 1、基本语法
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
1.
数值比较,比如:
**>,>=,<,<=,BETWEEN,=;**
-
数值比较,比如:**>,>=,<,<=,BETWEEN,=;**
2.
字符比较,比如:
**=,<>,IN;**
-
字符比较,比如:**=,<>,IN;**
3.
**IS NULL**
或者
**IS NOT NULL;**
-
**IS NULL** 或者 **IS NOT NULL;**
4.
逻辑符号
**AND,OR,NOT;**
-
逻辑符号 **AND,OR,NOT;**
常量支持类型为:
常量支持类型为:
1.
数值,比如:
**123,3.1415;**
-
数值,比如:**123,3.1415;**
2.
字符,比如:
**'abc',必须用单引号包裹起来;**
-
字符,比如:**'abc',必须用单引号包裹起来;**
3.
**NULL**
,特殊的常量
-
**NULL**,特殊的常量
4.
布尔值,
**TRUE**
或
**FALSE**
-
布尔值,**TRUE** 或 **FALSE**
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
```
```
public
void
subscribe(finalString topic, final MessageSelector messageSelector)
public
void
subscribe(finalString topic, final MessageSelector messageSelector)
```
```
### 2、使用样例
### 2、使用样例
...
@@ -646,7 +651,7 @@ consumer.start();
...
@@ -646,7 +651,7 @@ consumer.start();
#### 1. 创建事务性生产者
#### 1. 创建事务性生产者
使用
`TransactionMQProducer`
类创建生产者
客户
,并指定唯一的
`ProducerGroup`
,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。
使用 `TransactionMQProducer`类创建生产者,并指定唯一的 `ProducerGroup`,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。
```
java
```
java
...
@@ -726,14 +731,14 @@ public class TransactionListenerImpl implements TransactionListener {
...
@@ -726,14 +731,14 @@ public class TransactionListenerImpl implements TransactionListener {
```
```
###
1
、事务消息使用上的限制
###
2
、事务消息使用上的限制
1. 事务消息不支持延时消息和批量消息。
1. 事务消息不支持延时消息和批量消息。
2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 `transactionCheckMax`参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = `transactionCheckMax` ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 `AbstractTransactionCheckListener` 类来修改这个行为。
2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 `transactionCheckMax`参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = `transactionCheckMax` ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 `AbstractTransactionCheckListener` 类来修改这个行为。
3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 `transactionMsgTimeout` 参数。
3. 事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 `transactionMsgTimeout` 参数。
4. 事务性消息可能不止一次被检查或消费。
4. 事务性消息可能不止一次被检查或消费。
5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6.
事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询客户
。
6.
事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者
。
Logappender样例
Logappender样例
-----------------
-----------------
...
@@ -953,5 +958,4 @@ public class SimplePushConsumer {
...
@@ -953,5 +958,4 @@ public class SimplePushConsumer {
System.out.printf("Consumer startup OK%n");
System.out.printf("Consumer startup OK%n");
}
}
}
}
```
```
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录