Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
9da19d2e
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
268
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
9da19d2e
编写于
12月 20, 2019
作者:
H
huangli
提交者:
rongtongjin
1月 20, 2020
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
optimise benchmark consumer, add consume fail rate option
上级
922da666
变更
1
显示空白变更内容
内联
并排
Showing
1 changed file
with
32 addition
and
9 deletion
+32
-9
example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
.../java/org/apache/rocketmq/example/benchmark/Consumer.java
+32
-9
未找到文件。
example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
浏览文件 @
9da19d2e
...
...
@@ -22,7 +22,9 @@ import java.util.LinkedList;
import
java.util.List
;
import
java.util.Timer
;
import
java.util.TimerTask
;
import
java.util.concurrent.ThreadLocalRandom
;
import
java.util.concurrent.atomic.AtomicLong
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.Option
;
import
org.apache.commons.cli.Options
;
...
...
@@ -49,15 +51,16 @@ public class Consumer {
final
String
topic
=
commandLine
.
hasOption
(
't'
)
?
commandLine
.
getOptionValue
(
't'
).
trim
()
:
"BenchmarkTest"
;
final
String
groupPrefix
=
commandLine
.
hasOption
(
'g'
)
?
commandLine
.
getOptionValue
(
'g'
).
trim
()
:
"benchmark_consumer"
;
final
String
is
Pre
fixEnable
=
commandLine
.
hasOption
(
'p'
)
?
commandLine
.
getOptionValue
(
'p'
).
trim
()
:
"true"
;
final
String
is
Suf
fixEnable
=
commandLine
.
hasOption
(
'p'
)
?
commandLine
.
getOptionValue
(
'p'
).
trim
()
:
"true"
;
final
String
filterType
=
commandLine
.
hasOption
(
'f'
)
?
commandLine
.
getOptionValue
(
'f'
).
trim
()
:
null
;
final
String
expression
=
commandLine
.
hasOption
(
'e'
)
?
commandLine
.
getOptionValue
(
'e'
).
trim
()
:
null
;
final
double
failRate
=
commandLine
.
hasOption
(
'r'
)
?
Double
.
parseDouble
(
commandLine
.
getOptionValue
(
'r'
).
trim
())
:
0.0
;
String
group
=
groupPrefix
;
if
(
Boolean
.
parseBoolean
(
is
Pre
fixEnable
))
{
group
=
groupPrefix
+
"_"
+
Long
.
toString
(
System
.
currentTimeMillis
()
%
100
);
if
(
Boolean
.
parseBoolean
(
is
Suf
fixEnable
))
{
group
=
groupPrefix
+
"_"
+
(
System
.
currentTimeMillis
()
%
100
);
}
System
.
out
.
printf
(
"topic: %s, group: %s,
prefix: %s, filterType: %s, expression: %s%n"
,
topic
,
group
,
isPre
fixEnable
,
filterType
,
expression
);
System
.
out
.
printf
(
"topic: %s, group: %s,
suffix: %s, filterType: %s, expression: %s%n"
,
topic
,
group
,
isSuf
fixEnable
,
filterType
,
expression
);
final
StatsBenchmarkConsumer
statsBenchmarkConsumer
=
new
StatsBenchmarkConsumer
();
...
...
@@ -85,9 +88,15 @@ public class Consumer {
(
long
)
(((
end
[
1
]
-
begin
[
1
])
/
(
double
)
(
end
[
0
]
-
begin
[
0
]))
*
1000L
);
final
double
averageB2CRT
=
(
end
[
2
]
-
begin
[
2
])
/
(
double
)
(
end
[
1
]
-
begin
[
1
]);
final
double
averageS2CRT
=
(
end
[
3
]
-
begin
[
3
])
/
(
double
)
(
end
[
1
]
-
begin
[
1
]);
final
long
failCount
=
end
[
4
]
-
begin
[
4
];
final
long
b2cMax
=
statsBenchmarkConsumer
.
getBorn2ConsumerMaxRT
().
get
();
final
long
s2cMax
=
statsBenchmarkConsumer
.
getStore2ConsumerMaxRT
().
get
();
statsBenchmarkConsumer
.
getBorn2ConsumerMaxRT
().
set
(
0
);
statsBenchmarkConsumer
.
getStore2ConsumerMaxRT
().
set
(
0
);
System
.
out
.
printf
(
"
Consume TPS: %d Average(B2C) RT: %7.3f Average
(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n"
,
consumeTps
,
averageB2CRT
,
averageS2CRT
,
end
[
4
],
end
[
5
]
System
.
out
.
printf
(
"
TPS: %d FAIL: %d AVG(B2C) RT: %7.3f AVG
(S2C) RT: %7.3f MAX(B2C) RT: %d MAX(S2C) RT: %d%n"
,
consumeTps
,
failCount
,
averageB2CRT
,
averageS2CRT
,
b2cMax
,
s2cMax
);
}
}
...
...
@@ -144,8 +153,13 @@ public class Consumer {
compareAndSetMax
(
statsBenchmarkConsumer
.
getStore2ConsumerMaxRT
(),
store2ConsumerRT
);
if
(
ThreadLocalRandom
.
current
().
nextDouble
()
<
failRate
)
{
statsBenchmarkConsumer
.
getFailCount
().
incrementAndGet
();
return
ConsumeConcurrentlyStatus
.
RECONSUME_LATER
;
}
else
{
return
ConsumeConcurrentlyStatus
.
CONSUME_SUCCESS
;
}
}
});
consumer
.
start
();
...
...
@@ -174,6 +188,10 @@ public class Consumer {
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"r"
,
"fail rate"
,
true
,
"consumer fail rate, default 0"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
return
options
;
}
...
...
@@ -200,14 +218,15 @@ class StatsBenchmarkConsumer {
private
final
AtomicLong
store2ConsumerMaxRT
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
failCount
=
new
AtomicLong
(
0L
);
public
Long
[]
createSnapshot
()
{
Long
[]
snap
=
new
Long
[]
{
System
.
currentTimeMillis
(),
this
.
receiveMessageTotalCount
.
get
(),
this
.
born2ConsumerTotalRT
.
get
(),
this
.
store2ConsumerTotalRT
.
get
(),
this
.
born2ConsumerMaxRT
.
get
(),
this
.
store2ConsumerMaxRT
.
get
(),
this
.
failCount
.
get
()
};
return
snap
;
...
...
@@ -232,4 +251,8 @@ class StatsBenchmarkConsumer {
public
AtomicLong
getStore2ConsumerMaxRT
()
{
return
store2ConsumerMaxRT
;
}
public
AtomicLong
getFailCount
()
{
return
failCount
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录