Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
92b8cf97
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
268
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
92b8cf97
编写于
3月 30, 2020
作者:
M
MountainOne
提交者:
GitHub
3月 30, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[ISSUE #1505]Fix the ListSplitter of batch sample in docs
上级
30ad1723
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
78 addition
and
69 deletion
+78
-69
docs/cn/RocketMQ_Example.md
docs/cn/RocketMQ_Example.md
+46
-42
docs/en/Example_Batch.md
docs/en/Example_Batch.md
+32
-27
未找到文件。
docs/cn/RocketMQ_Example.md
浏览文件 @
92b8cf97
...
...
@@ -500,48 +500,52 @@ try {
复杂度只有当你发送大批量时才会增长,你可能不确定它是否超过了大小限制(4MB)。这时候你最好把你的消息列表分割一下:
```
java
public
class
ListSplitter
implements
Iterator
<
List
<
Message
>>
{
private
final
int
SIZE_LIMIT
=
1024
*
1024
*
4
;
private
final
List
<
Message
>
messages
;
private
int
currIndex
;
public
ListSplitter
(
List
<
Message
>
messages
)
{
this
.
messages
=
messages
;
}
@Override
public
boolean
hasNext
()
{
return
currIndex
<
messages
.
size
();
}
@Override
public
List
<
Message
>
next
()
{
int
nextIndex
=
currIndex
;
int
totalSize
=
0
;
for
(;
nextIndex
<
messages
.
size
();
nextIndex
++)
{
Message
message
=
messages
.
get
(
nextIndex
);
int
tmpSize
=
message
.
getTopic
().
length
()
+
message
.
getBody
().
length
;
Map
<
String
,
String
>
properties
=
message
.
getProperties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
properties
.
entrySet
())
{
tmpSize
+=
entry
.
getKey
().
length
()
+
entry
.
getValue
().
length
();
}
tmpSize
=
tmpSize
+
20
;
// 增加日志的开销20字节
if
(
tmpSize
>
SIZE_LIMIT
)
{
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if
(
nextIndex
-
currIndex
==
0
)
{
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex
++;
}
break
;
}
if
(
tmpSize
+
totalSize
>
SIZE_LIMIT
)
{
break
;
}
else
{
totalSize
+=
tmpSize
;
}
}
List
<
Message
>
subList
=
messages
.
subList
(
currIndex
,
nextIndex
);
currIndex
=
nextIndex
;
return
subList
;
}
public
class
ListSplitter
implements
Iterator
<
List
<
Message
>>
{
private
final
int
SIZE_LIMIT
=
1024
*
1024
*
4
;
private
final
List
<
Message
>
messages
;
private
int
currIndex
;
public
ListSplitter
(
List
<
Message
>
messages
)
{
this
.
messages
=
messages
;
}
@Override
public
boolean
hasNext
()
{
return
currIndex
<
messages
.
size
();
}
@Override
public
List
<
Message
>
next
()
{
int
startIndex
=
getStartIndex
();
int
nextIndex
=
startIndex
;
int
totalSize
=
0
;
for
(;
nextIndex
<
messages
.
size
();
nextIndex
++)
{
Message
message
=
messages
.
get
(
nextIndex
);
int
tmpSize
=
calcMessageSize
(
message
);
if
(
tmpSize
+
totalSize
>
SIZE_LIMIT
)
{
break
;
}
else
{
totalSize
+=
tmpSize
;
}
}
List
<
Message
>
subList
=
messages
.
subList
(
startIndex
,
nextIndex
);
currIndex
=
nextIndex
;
return
subList
;
}
private
int
getStartIndex
()
{
Message
currMessage
=
messages
.
get
(
currIndex
);
int
tmpSize
=
calcMessageSize
(
currMessage
);
while
(
tmpSize
>
SIZE_LIMIT
)
{
currIndex
+=
1
;
Message
message
=
messages
.
get
(
curIndex
);
tmpSize
=
calcMessageSize
(
message
);
}
return
currIndex
;
}
private
int
calcMessageSize
(
Message
message
)
{
int
tmpSize
=
message
.
getTopic
().
length
()
+
message
.
getBody
().
length
();
Map
<
String
,
String
>
properties
=
message
.
getProperties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
properties
.
entrySet
())
{
tmpSize
+=
entry
.
getKey
().
length
()
+
entry
.
getValue
().
length
();
}
tmpSize
=
tmpSize
+
20
;
// 增加⽇日志的开销20字节
return
tmpSize
;
}
}
//把大的消息分裂成若干个小的消息
ListSplitter
splitter
=
new
ListSplitter
(
messages
);
...
...
docs/en/Example_Batch.md
浏览文件 @
92b8cf97
...
...
@@ -19,47 +19,52 @@ try {
### 2 Split into Lists
The complexity only grow when you send large batch and you may not sure if it exceeds the size limit (4MiB). At this time, you’d better split the lists:
```
java
public
class
ListSplitter
implements
Iterator
<
List
<
Message
>>
{
private
final
int
SIZE_LIMIT
=
10
00
*
1000
;
public
class
ListSplitter
implements
Iterator
<
List
<
Message
>>
{
private
final
int
SIZE_LIMIT
=
10
24
*
1024
*
4
;
private
final
List
<
Message
>
messages
;
private
int
currIndex
;
public
ListSplitter
(
List
<
Message
>
messages
)
{
this
.
messages
=
messages
;
public
ListSplitter
(
List
<
Message
>
messages
)
{
this
.
messages
=
messages
;
}
@Override
public
boolean
hasNext
()
{
return
currIndex
<
messages
.
size
();
return
currIndex
<
messages
.
size
();
}
@Override
public
List
<
Message
>
next
()
{
int
nextIndex
=
currIndex
;
@Override
public
List
<
Message
>
next
()
{
int
startIndex
=
getStartIndex
();
int
nextIndex
=
startIndex
;
int
totalSize
=
0
;
for
(;
nextIndex
<
messages
.
size
();
nextIndex
++)
{
Message
message
=
messages
.
get
(
nextIndex
);
int
tmpSize
=
message
.
getTopic
().
length
()
+
message
.
getBody
().
length
;
Map
<
String
,
String
>
properties
=
message
.
getProperties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
properties
.
entrySet
())
{
tmpSize
+=
entry
.
getKey
().
length
()
+
entry
.
getValue
().
length
();
}
tmpSize
=
tmpSize
+
20
;
//for log overhead
if
(
tmpSize
>
SIZE_LIMIT
)
{
//it is unexpected that single message exceeds the SIZE_LIMIT
//here just let it go, otherwise it will block the splitting process
if
(
nextIndex
-
currIndex
==
0
)
{
//if the next sublist has no element, add this one and then break, otherwise just break
nextIndex
++;
}
break
;
}
Message
message
=
messages
.
get
(
nextIndex
);
int
tmpSize
=
calcMessageSize
(
message
);
if
(
tmpSize
+
totalSize
>
SIZE_LIMIT
)
{
break
;
break
;
}
else
{
totalSize
+=
tmpSize
;
totalSize
+=
tmpSize
;
}
}
List
<
Message
>
subList
=
messages
.
subList
(
currIndex
,
nextIndex
);
List
<
Message
>
subList
=
messages
.
subList
(
startIndex
,
nextIndex
);
currIndex
=
nextIndex
;
return
subList
;
}
private
int
getStartIndex
()
{
Message
currMessage
=
messages
.
get
(
currIndex
);
int
tmpSize
=
calcMessageSize
(
currMessage
);
while
(
tmpSize
>
SIZE_LIMIT
)
{
currIndex
+=
1
;
Message
message
=
messages
.
get
(
curIndex
);
tmpSize
=
calcMessageSize
(
message
);
}
return
currIndex
;
}
private
int
calcMessageSize
(
Message
message
)
{
int
tmpSize
=
message
.
getTopic
().
length
()
+
message
.
getBody
().
length
();
Map
<
String
,
String
>
properties
=
message
.
getProperties
();
for
(
Map
.
Entry
<
String
,
String
>
entry
:
properties
.
entrySet
())
{
tmpSize
+=
entry
.
getKey
().
length
()
+
entry
.
getValue
().
length
();
}
tmpSize
=
tmpSize
+
20
;
// Increase the log overhead by 20 bytes
return
tmpSize
;
}
}
// then you could split the large list into small ones:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录