Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Kwan的解忧杂货铺@新空间代码工作室
Rocketmq
提交
71fcc075
R
Rocketmq
项目概览
Kwan的解忧杂货铺@新空间代码工作室
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
71fcc075
编写于
1月 04, 2019
作者:
神奇大叶子
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Modified some typos
上级
9d0e4296
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
49 addition
and
49 deletion
+49
-49
docs/cn/RocketMQ_Example.md
docs/cn/RocketMQ_Example.md
+49
-49
未找到文件。
docs/cn/RocketMQ
Example.md
→
docs/cn/RocketMQ
_
Example.md
浏览文件 @
71fcc075
样例(sample)
============
##
基本样例
基本样例
--------
在基本样例中我们提供如下的功能场景:
...
...
@@ -9,23 +9,23 @@
*
使用RocketMQ发送三种类型的消息: 同步消息,异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答。
*
使用RocketMQ来消费接收到的消息。
### 加入依赖:
###
1、
加入依赖:
`maven:`
```
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
</dependency>
```
`gradle`
```
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
```
### 消息发送
###
2、
消息发送
1
Producer端发送同步消息
#### 1.
Producer端发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
```
java
...
...
@@ -38,24 +38,24 @@ public class SyncProducer {
// 启动Producer实例
producer
.
start
();
for
(
int
i
=
0
;
i
<
100
;
i
++)
{
// 创建消息,并指定
topic, t
ag和消息体
// 创建消息,并指定
Topic,T
ag和消息体
Message
msg
=
new
Message
(
"TopicTest"
/* Topic */
,
"TagA"
/* Tag */
,
(
"Hello RocketMQ "
+
i
).
getBytes
(
RemotingHelper
.
DEFAULT_CHARSET
)
/* Message body */
);
// 发送消息到一个
b
roker
// 发送消息到一个
B
roker
SendResult
sendResult
=
producer
.
send
(
msg
);
// 通过sendResult返回消息是否成功送达
System
.
out
.
printf
(
"%s%n"
,
sendResult
);
}
// 如果不再发送消息,关闭
producer实例.
// 如果不再发送消息,关闭
Producer实例。
producer
.
shutdown
();
}
}
```
2
发送异步消息
#### 2.
发送异步消息
异步消息通常用在对应答时间敏感的业务场景,即发送端不能容忍长时间地等待broker的相
应。
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响
应。
```
java
public
class
AsyncProducer
{
...
...
@@ -69,7 +69,7 @@ public class AsyncProducer {
producer
.
setRetryTimesWhenSendAsyncFailed
(
0
);
for
(
int
i
=
0
;
i
<
100
;
i
++)
{
final
int
index
=
i
;
// 创建消息,并指定
topic, t
ag和消息体
// 创建消息,并指定
Topic,T
ag和消息体
Message
msg
=
new
Message
(
"TopicTest"
,
"TagA"
,
"OrderID188"
,
...
...
@@ -88,15 +88,15 @@ public class AsyncProducer {
}
});
}
// 如果不再发送消息,关闭
producer实例.
// 如果不再发送消息,关闭
Producer实例。
producer
.
shutdown
();
}
}
```
3
单向发送消息
#### 3.
单向发送消息
这种方式主要用在不特别关
系
发送结果的场景,例如日志发送。
这种方式主要用在不特别关
心
发送结果的场景,例如日志发送。
```
java
...
...
@@ -109,7 +109,7 @@ public class OnewayProducer {
// 启动Producer实例
producer
.
start
();
for
(
int
i
=
0
;
i
<
100
;
i
++)
{
// 创建消息,并指定
topic, t
ag和消息体
// 创建消息,并指定
Topic,T
ag和消息体
Message
msg
=
new
Message
(
"TopicTest"
/* Topic */
,
"TagA"
/* Tag */
,
(
"Hello RocketMQ "
+
i
).
getBytes
(
RemotingHelper
.
DEFAULT_CHARSET
)
/* Message body */
...
...
@@ -118,12 +118,12 @@ public class OnewayProducer {
producer
.
sendOneway
(
msg
);
}
// 如果不再发送消息,关闭
producer实例.
// 如果不再发送消息,关闭
Producer实例。
producer
.
shutdown
();
}
```
### 消费消息
###
3、
消费消息
```
java
...
...
@@ -137,7 +137,7 @@ public class Consumer {
// 设置NameServer的地址
consumer
.
setNamesrvAddr
(
"localhost:9876"
);
// 订阅一个或者多个
topic,以及t
ag来过滤需要消费的消息
// 订阅一个或者多个
Topic,以及T
ag来过滤需要消费的消息
consumer
.
subscribe
(
"TopicTest"
,
"*"
);
// 注册回调实现类来处理从broker拉取回来的消息
consumer
.
registerMessageListener
(
new
MessageListenerConcurrently
()
{
...
...
@@ -163,13 +163,13 @@ public class Consumer {
顺序消息样例
----------
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,
可以分为分区有序或者全局有序。
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,
可以分为分区有序或者全局有序。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,
消息都是有序的。
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,
消息都是有序的。
下面用订单进行示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
### 顺序消息生产
###
1、
顺序消息生产
```
java
...
...
@@ -209,7 +209,7 @@ public class Producer {
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
// 加个时间前缀
String
body
=
dateStr
+
" Hello RocketMQ "
+
orderList
.
get
(
i
);
Message
msg
=
new
Message
(
"TopicTest
jjj
"
,
tags
[
i
%
tags
.
length
],
"KEY"
+
i
,
body
.
getBytes
());
Message
msg
=
new
Message
(
"TopicTest"
,
tags
[
i
%
tags
.
length
],
"KEY"
+
i
,
body
.
getBytes
());
SendResult
sendResult
=
producer
.
send
(
msg
,
new
MessageQueueSelector
()
{
@Override
...
...
@@ -323,7 +323,7 @@ public class Producer {
```
### 顺序消费消息
###
2、
顺序消费消息
```
java
...
...
@@ -361,7 +361,7 @@ public class ConsumerInOrder {
*/
consumer
.
setConsumeFromWhere
(
ConsumeFromWhere
.
CONSUME_FROM_FIRST_OFFSET
);
consumer
.
subscribe
(
"TopicTest
jjj
"
,
"TagA || TagC || TagD"
);
consumer
.
subscribe
(
"TopicTest"
,
"TagA || TagC || TagD"
);
consumer
.
registerMessageListener
(
new
MessageListenerOrderly
()
{
...
...
@@ -396,7 +396,7 @@ public class ConsumerInOrder {
延时消息样例
----------
### 启动消费者等待传入订阅消息
###
1、
启动消费者等待传入订阅消息
```
java
...
...
@@ -411,7 +411,7 @@ public class ScheduledMessageConsumer {
public
static
void
main
(
String
[]
args
)
throws
Exception
{
// 实例化消费者
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"ExampleConsumer"
);
// 订阅
t
opics
// 订阅
T
opics
consumer
.
subscribe
(
"TestTopic"
,
"*"
);
// 注册消息监听者
consumer
.
registerMessageListener
(
new
MessageListenerConcurrently
()
{
...
...
@@ -431,7 +431,7 @@ public class ScheduledMessageConsumer {
```
### 发送延时消息
###
2、
发送延时消息
```
java
...
...
@@ -458,16 +458,16 @@ public class ScheduledMessageProducer {
}
```
### 验证
###
3、
验证
您将会看到消息的消费比存储时间晚10秒。
批量消息样例
----------
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic
,相同的waitStoreMsgOK,
而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic
,相同的waitStoreMsgOK,
而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
### 发送批量消息
###
1、
发送批量消息
如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:
...
...
@@ -486,7 +486,7 @@ try {
```
### 消息列表分割
###
2、
消息列表分割
复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:
...
...
@@ -574,7 +574,7 @@ consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
| c = true |
\-\-\-\-\-\-\-\-\-\-\-\-
### 基本语法
###
1、
基本语法
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
...
...
@@ -595,11 +595,11 @@ RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容
publicvoidsubscribe(finalString topic, final MessageSelector messageSelector)
```
### 使用样例
###
2、
使用样例
1)
生产者样例
#### 1.
生产者样例
发送消息时,你能通过
`putUserProperty`
来设置消息的属性
发送消息时,你能通过
`putUserProperty`
来设置消息的属性
```
java
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"please_rename_unique_group_name"
);
...
...
@@ -615,9 +615,9 @@ SendResult sendResult = producer.send(msg);
producer
.
shutdown
();
```
2)
消费者样例
#### 2.
消费者样例
用MessageSelector.bys
ql来使用sql筛选消息
用MessageSelector.byS
ql来使用sql筛选消息
```
java
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"please_rename_unique_group_name_4"
);
...
...
@@ -642,9 +642,9 @@ consumer.start();
2.
TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
3.
TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
### 发送事务消息样例
###
1、
发送事务消息样例
1
创建事务性生产者
#### 1.
创建事务性生产者
使用
`TransactionMQProducer`
类创建生产者客户,并指定唯一的
`ProducerGroup`
,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。
...
...
@@ -692,7 +692,7 @@ public class TransactionProducer {
}
```
2
实现事务的监听接口
#### 2.
实现事务的监听接口
当发送半消息成功时,我们使用
`executeLocalTransaction`
方法来执行本地事务。它返回前一节中提到的三个事务状态之一。
`checkLocalTranscation`
方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
...
...
@@ -726,7 +726,7 @@ public class TransactionListenerImpl implements TransactionListener {
```
### 事务消息使用上的限制
###
1、
事务消息使用上的限制
1.
事务消息不支持延时消息和批量消息。
2.
为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的
`transactionCheckMax`
参数来修改此限制。如果已经检查某条消息超过 N 次的话( N =
`transactionCheckMax`
) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写
`AbstractTransactionCheckListener`
类来修改这个行为。
...
...
@@ -740,7 +740,7 @@ Logappender样例
RocketMQ日志提供log4j、log4j2和logback日志框架作为业务应用,下面是配置样例
### log4j样例
###
1、
log4j样例
按下面样例使用log4j属性配置
```
...
...
@@ -769,7 +769,7 @@ log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F
<appender-ref ref="mqAppender1"/>
</appender>
```
### log4j2样例
###
2、
log4j2样例
用log4j2时,配置如下,如果想要非阻塞,只需要使用异步添加引用即可
```
...
...
@@ -778,7 +778,7 @@ log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F
<PatternLayout pattern="%d [%p] hahahah %c %m%n"/>
</RocketMQ>
```
### logback样例
###
3、
logback样例
```
<appender name="mqAppender1"class="org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender">
<tag>yourTag</tag>
...
...
@@ -803,7 +803,7 @@ Open Messaging样例
[
OpenMessaging
](
https://www.google.com/url?q=http://openmessaging.cloud/&sa=D&ust=1546524111089000
)
旨在建立消息和流处理规范,以为金融、电子商务、物联网和大数据领域提供通用框架及工业级指导方案。在分布式异构环境中,设计原则是面向云、简单、灵活和独立于语言。符合这些规范将帮助企业方便的开发跨平台和操作系统的异构消息传递应用程序。提供了openmessaging-api 0.3.0-alpha的部分实现,下面的示例演示如何基于OpenMessaging访问RocketMQ。
### OMSProducer样例
###
1、
OMSProducer样例
下面的示例演示如何在同步、异步或单向传输中向RocketMQ代理发送消息。
...
...
@@ -862,7 +862,7 @@ public class SimpleProducer {
}
```
### OMSPullConsumer
###
2、
OMSPullConsumer
用OMS PullConsumer 来从指定的队列中拉取消息
...
...
@@ -915,7 +915,7 @@ public class SimplePullConsumer {
}
```
### OMSPushConsumer
###
3、
OMSPushConsumer
以下示范如何将 OMS PushConsumer 添加到指定的队列,并通过 MessageListener 消费这些消息。
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录