Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
0cc5ad2d
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
270
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
0cc5ad2d
编写于
6月 23, 2021
作者:
C
CuCu
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add Chinese version of Example_Batch
上级
278d3285
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
82 addition
and
0 deletion
+82
-0
docs/cn/Example_Batch.md
docs/cn/Example_Batch.md
+82
-0
未找到文件。
docs/cn/Example_Batch.md
0 → 100644
浏览文件 @
0cc5ad2d
# 批量消息发送
批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送 4MiB 的消息,但是如果需要发送更大的消息,建议将较大的消息分成多个不超过 1MiB 的小消息。
### 1 发送批量消息
如果你一次只发送不超过 4MiB 的消息,使用批处理很容易:
```
java
String
topic
=
"BatchTest"
;
List
<
Message
>
messages
=
new
ArrayList
<>();
messages
.
add
(
new
Message
(
topic
,
"TagA"
,
"OrderID001"
,
"Hello world 0"
.
getBytes
()));
messages
.
add
(
new
Message
(
topic
,
"TagA"
,
"OrderID002"
,
"Hello world 1"
.
getBytes
()));
messages
.
add
(
new
Message
(
topic
,
"TagA"
,
"OrderID003"
,
"Hello world 2"
.
getBytes
()));
try
{
producer
.
send
(
messages
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
//handle the error
}
```
### 2 拆分
当您发送较大的消息时,复杂性会增加,如果您不确定它是否超过 4MiB的限制。 这时候,您最好将较大的消息分成多个不超过 1MiB 的小消息:
```
java
public
class
ListSplitter
implements
Iterator
<
List
<
Message
>>
{
private
final
int
SIZE_LIMIT
=
1024
*
1024
*
4
;
private
final
List
<
Message
>
messages
;
private
int
currIndex
;
public
ListSplitter
(
List
<
Message
>
messages
)
{
this
.
messages
=
messages
;
}
@Override
public
boolean
hasNext
()
{
return
currIndex
<
messages
.
size
();
}
@Override
public
List
<
Message
>
next
()
{
int
startIndex
=
getStartIndex
();
int
nextIndex
=
startIndex
;
int
totalSize
=
0
;
for
(;
nextIndex
<
messages
.
size
();
nextIndex
++)
{
Message
message
=
messages
.
get
(
nextIndex
);
int
tmpSize
=
calcMessageSize
(
message
);
if
(
tmpSize
+
totalSize
>
SIZE_LIMIT
)
{
break
;
}
else
{
totalSize
+=
tmpSize
;
}
}
List
<
Message
>
subList
=
messages
.
subList
(
startIndex
,
nextIndex
);
currIndex
=
nextIndex
;
return
subList
;
}
private
int
getStartIndex
()
{
Message
currMessage
=
messages
.
get
(
currIndex
);
int
tmpSize
=
calcMessageSize
(
currMessage
);
while
(
tmpSize
>
SIZE_LIMIT
)
{
currIndex
+=
1
;
Message
message
=
messages
.
get
(
curIndex
);
tmpSize
=
calcMessageSize
(
message
);
}
return
currIndex
;
}
private
int
calcMessageSize
(
Message
message
)
{
int
tmpSize
=
message
.
getTopic
().
length
()
+
message
.
getBody
().
length
();
Map
<
String
,
String
>
properties
=
message
.
getProperties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
properties
.
entrySet
())
{
tmpSize
+=
entry
.
getKey
().
length
()
+
entry
.
getValue
().
length
();
}
tmpSize
=
tmpSize
+
20
;
// Increase the log overhead by 20 bytes
return
tmpSize
;
}
}
// then you could split the large list into small ones:
ListSplitter
splitter
=
new
ListSplitter
(
messages
);
while
(
splitter
.
hasNext
())
{
try
{
List
<
Message
>
listItem
=
splitter
.
next
();
producer
.
send
(
listItem
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
// handle the error
}
}
```
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录