Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
168c68be
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
168c68be
编写于
1月 24, 2019
作者:
神奇大叶子
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
1. Modified the format
上级
146fc437
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
80 addition
and
85 deletion
+80
-85
docs/cn/RocketMQ_Example.md
docs/cn/RocketMQ_Example.md
+80
-85
未找到文件。
docs/cn/RocketMQ_Example.md
浏览文件 @
168c68be
样例(sample)
============
基本样例
1 基本样例
--------
在基本样例中我们提供如下的功能场景:
...
...
@@ -9,23 +6,23 @@
*
使用RocketMQ发送三种类型的消息:同步消息,异步消息和单向消息。其中前两种消息是可靠的,因为会有发送是否成功的应答。
*
使用RocketMQ来消费接收到的消息。
### 1
、
加入依赖:
### 1
.1
加入依赖:
`maven:`
```
<dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.3.0</version>
<version>4.3.0</version>
</dependency>
```
`gradle`
```
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
```
###
2、
消息发送
###
1.2
消息发送
#### 1
.
Producer端发送同步消息
#### 1
、
Producer端发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
```
java
...
...
@@ -53,7 +50,7 @@ public class SyncProducer {
}
}
```
#### 2
.
发送异步消息
#### 2
、
发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
...
...
@@ -94,7 +91,7 @@ public class AsyncProducer {
}
```
#### 3
.
单向发送消息
#### 3
、
单向发送消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送。
...
...
@@ -123,7 +120,7 @@ public class OnewayProducer {
}
```
###
3、
消费消息
###
1.3
消费消息
```
java
public
class
Consumer
{
...
...
@@ -154,7 +151,7 @@ public class Consumer {
}
```
顺序消息样例
2
顺序消息样例
----------
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。
...
...
@@ -163,7 +160,7 @@ public class Consumer {
下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。
###
1、
顺序消息生产
###
2.1
顺序消息生产
```
java
package
org.apache.rocketmq.example.order2
;
...
...
@@ -315,7 +312,7 @@ public class Producer {
}
```
### 2
、
顺序消费消息
### 2
.2
顺序消费消息
```
java
import
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
;
...
...
@@ -383,10 +380,10 @@ public class ConsumerInOrder {
}
```
延时消息样例
3
延时消息样例
----------
###
1、
启动消费者等待传入订阅消息
###
3.1
启动消费者等待传入订阅消息
```
java
...
...
@@ -421,7 +418,7 @@ public class ScheduledMessageConsumer {
```
###
2、
发送延时消息
###
3.2
发送延时消息
```
java
...
...
@@ -448,14 +445,14 @@ public class ScheduledMessageProducer {
}
```
### 3
、
验证
### 3
.3
验证
您将会看到消息的消费比存储时间晚10秒。
###
4、
延时消息的使用场景
###
3.4
延时消息的使用场景
1.
比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
###
5、
延时消息的使用限制
###
3.5
延时消息的使用限制
```
java
// org/apache/rocketmq/store/config/MessageStoreConfig.java
...
...
@@ -466,13 +463,12 @@ private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m
消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码
`SendMessageProcessor.java`
批量消息样例
4 批量消息样例
----------
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
###
1、
发送批量消息
###
4.1
发送批量消息
如果您每次只发送不超过4MB的消息,则很容易使用批处理,样例如下:
...
...
@@ -491,7 +487,7 @@ try {
```
###
2、
消息列表分割
###
4.2
消息列表分割
复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:
...
...
@@ -552,7 +548,7 @@ while (splitter.hasNext()) {
}
```
过滤消息样例
5
过滤消息样例
----------
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
...
...
@@ -579,7 +575,7 @@ consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
| c = true |
------------
```
###
1、
基本语法
###
5.1
基本语法
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
...
...
@@ -600,9 +596,9 @@ RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容
public void subscribe(finalString topic, final MessageSelector messageSelector)
```
###
2、
使用样例
###
5.2
使用样例
#### 1
.
生产者样例
#### 1
、
生产者样例
发送消息时,你能通过`putUserProperty`来设置消息的属性
...
...
@@ -620,7 +616,7 @@ SendResult sendResult = producer.send(msg);
producer.shutdown();
```
#### 2
.
消费者样例
#### 2
、
消费者样例
用MessageSelector.bySql来使用sql筛选消息
...
...
@@ -638,7 +634,7 @@ consumer.start();
```
消息事务样例
6
消息事务样例
----------
事务消息共有三种状态,提交状态、回滚状态、中间状态:
...
...
@@ -647,14 +643,13 @@ consumer.start();
2. TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
3. TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
###
1、
发送事务消息样例
###
6.1
发送事务消息样例
#### 1
.
创建事务性生产者
#### 1
、
创建事务性生产者
使用 `TransactionMQProducer`类创建生产者,并指定唯一的 `ProducerGroup`,就可以设置自定义线程池来处理这些检查请求。执行本地事务后、需要根据执行结果对消息队列进行回复。回传的事务状态在请参考前一节。
```
java
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
...
...
@@ -697,7 +692,7 @@ public class TransactionProducer {
}
```
#### 2
.
实现事务的监听接口
#### 2
、
实现事务的监听接口
当发送半消息成功时,我们使用 `executeLocalTransaction` 方法来执行本地事务。它返回前一节中提到的三个事务状态之一。`checkLocalTranscation` 方法用于检查本地事务状态,并回应消息队列的检查请求。它也是返回前一节中提到的三个事务状态之一。
...
...
@@ -731,7 +726,7 @@ public class TransactionListenerImpl implements TransactionListener {
```
###
2、
事务消息使用上的限制
###
6.2
事务消息使用上的限制
1. 事务消息不支持延时消息和批量消息。
2. 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 `transactionCheckMax`参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = `transactionCheckMax` ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 `AbstractTransactionCheckListener` 类来修改这个行为。
...
...
@@ -740,75 +735,75 @@ public class TransactionListenerImpl implements TransactionListener {
5. 提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
6. 事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
Logappender样例
7
Logappender样例
-----------------
RocketMQ日志提供log4j、log4j2和logback日志框架作为业务应用,下面是配置样例
###
1、
log4j样例
###
7.1
log4j样例
按下面样例使用log4j属性配置
```
log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
log4j.appender.mq.Tag=yourTag
log4j.appender.mq.Topic=yourLogTopic
log4j.appender.mq.ProducerGroup=yourLogGroup
log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress
log4j.appender.mq.layout=org.apache.log4j.PatternLayout
log4j.appender.mq=org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender
log4j.appender.mq.Tag=yourTag
log4j.appender.mq.Topic=yourLogTopic
log4j.appender.mq.ProducerGroup=yourLogGroup
log4j.appender.mq.NameServerAddress=yourRocketmqNameserverAddress
log4j.appender.mq.layout=org.apache.log4j.PatternLayout
log4j.appender.mq.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-4r [%t] (%F:%L) %-5p - %m%n
```
按下面样例使用log4j xml配置来使用异步添加日志
```
<appender
name=
"mqAppender1"
class=
"org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender"
>
<param
name=
"Tag"
value=
"yourTag"
/>
<param
name=
"Topic"
value=
"yourLogTopic"
/>
<param
name=
"ProducerGroup"
value=
"yourLogGroup"
/>
<param
name=
"NameServerAddress"
value=
"yourRocketmqNameserverAddress"
/>
<layout
class=
"org.apache.log4j.PatternLayout"
>
<param
name=
"ConversionPattern"
value=
"%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n"
/>
</layout>
</appender>
<appender
name=
"mqAsyncAppender1"
class=
"org.apache.log4j.AsyncAppender"
>
<param
name=
"BufferSize"
value=
"1024"
/>
<param
name=
"Blocking"
value=
"false"
/>
<appender-ref
ref=
"mqAppender1"
/>
<appender
name=
"mqAppender1"
class=
"org.apache.rocketmq.logappender.log4j.RocketmqLog4jAppender"
>
<param
name=
"Tag"
value=
"yourTag"
/>
<param
name=
"Topic"
value=
"yourLogTopic"
/>
<param
name=
"ProducerGroup"
value=
"yourLogGroup"
/>
<param
name=
"NameServerAddress"
value=
"yourRocketmqNameserverAddress"
/>
<layout
class=
"org.apache.log4j.PatternLayout"
>
<param
name=
"ConversionPattern"
value=
"%d{yyyy-MM-dd HH:mm:ss}-%p %t %c - %m%n"
/>
</layout>
</appender>
<appender
name=
"mqAsyncAppender1"
class=
"org.apache.log4j.AsyncAppender"
>
<param
name=
"BufferSize"
value=
"1024"
/>
<param
name=
"Blocking"
value=
"false"
/>
<appender-ref
ref=
"mqAppender1"
/>
</appender>
```
###
2、
log4j2样例
###
7.2
log4j2样例
用log4j2时,配置如下,如果想要非阻塞,只需要使用异步添加引用即可
```
<RocketMQ name="rocketmqAppender" producerGroup="yourLogGroup" nameServerAddress="yourRocketmqNameserverAddress"
topic="yourLogTopic" tag="yourTag">
<PatternLayout
pattern=
"%d [%p] hahahah %c %m%n"
/>
<RocketMQ name="rocketmqAppender" producerGroup="yourLogGroup" nameServerAddress="yourRocketmqNameserverAddress"
topic="yourLogTopic" tag="yourTag">
<PatternLayout
pattern=
"%d [%p] hahahah %c %m%n"
/>
</RocketMQ>
```
###
3、
logback样例
```
<appender
name=
"mqAppender1"
class=
"org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender"
>
<tag>
yourTag
</tag>
<topic>
yourLogTopic
</topic>
<producerGroup>
yourLogGroup
</producerGroup>
<nameServerAddress>
yourRocketmqNameserverAddress
</nameServerAddress>
<layout>
<pattern>
%date %p %t - %m%n
</pattern>
</layout>
</appender>
<appender
name=
"mqAsyncAppender1"
class=
"ch.qos.logback.classic.AsyncAppender"
>
<queueSize>
1024
</queueSize>
<discardingThreshold>
80
</discardingThreshold>
<maxFlushTime>
2000
</maxFlushTime>
<neverBlock>
true
</neverBlock>
<appender-ref
ref=
"mqAppender1"
/>
</appender>
```
OpenMessaging样例
###
7.3
logback样例
```
<appender
name=
"mqAppender1"
class=
"org.apache.rocketmq.logappender.logback.RocketmqLogbackAppender"
>
<tag>
yourTag
</tag>
<topic>
yourLogTopic
</topic>
<producerGroup>
yourLogGroup
</producerGroup>
<nameServerAddress>
yourRocketmqNameserverAddress
</nameServerAddress>
<layout>
<pattern>
%date %p %t - %m%n
</pattern>
</layout>
</appender>
<appender
name=
"mqAsyncAppender1"
class=
"ch.qos.logback.classic.AsyncAppender"
>
<queueSize>
1024
</queueSize>
<discardingThreshold>
80
</discardingThreshold>
<maxFlushTime>
2000
</maxFlushTime>
<neverBlock>
true
</neverBlock>
<appender-ref
ref=
"mqAppender1"
/>
</appender>
```
8
OpenMessaging样例
---------------
[OpenMessaging](https://www.google.com/url?q=http://openmessaging.cloud/&sa=D&ust=1546524111089000)旨在建立消息和流处理规范,以为金融、电子商务、物联网和大数据领域提供通用框架及工业级指导方案。在分布式异构环境中,设计原则是面向云、简单、灵活和独立于语言。符合这些规范将帮助企业方便的开发跨平台和操作系统的异构消息传递应用程序。提供了openmessaging-api 0.3.0-alpha的部分实现,下面的示例演示如何基于OpenMessaging访问RocketMQ。
###
1、
OMSProducer样例
###
8.1
OMSProducer样例
下面的示例演示如何在同步、异步或单向传输中向RocketMQ代理发送消息。
...
...
@@ -867,7 +862,7 @@ public class SimpleProducer {
}
```
###
2、
OMSPullConsumer
###
8.2
OMSPullConsumer
用OMS PullConsumer 来从指定的队列中拉取消息
...
...
@@ -920,7 +915,7 @@ public class SimplePullConsumer {
}
```
###
3、
OMSPushConsumer
###
8.3
OMSPushConsumer
以下示范如何将 OMS PushConsumer 添加到指定的队列,并通过 MessageListener 消费这些消息。
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录