Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
a0a2192c
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
269
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
a0a2192c
编写于
6月 25, 2021
作者:
G
Git_Yang
提交者:
GitHub
6月 25, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[ISSUE #2873] add benchmark for batch message
Signed-off-by:
N
zhangyang
<
Git_Yang@163.com
>
上级
160c5772
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
429 addition
and
0 deletion
+429
-0
distribution/benchmark/batchproducer.sh
distribution/benchmark/batchproducer.sh
+18
-0
example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
.../org/apache/rocketmq/example/benchmark/BatchProducer.java
+411
-0
未找到文件。
distribution/benchmark/batchproducer.sh
0 → 100644
浏览文件 @
a0a2192c
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
sh ./runclass.sh org.apache.rocketmq.example.benchmark.BatchProducer
$@
&
example/src/main/java/org/apache/rocketmq/example/benchmark/BatchProducer.java
0 → 100644
浏览文件 @
a0a2192c
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.example.benchmark
;
import
java.io.UnsupportedEncodingException
;
import
java.util.ArrayList
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Random
;
import
java.util.Timer
;
import
java.util.TimerTask
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.atomic.AtomicLong
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.Option
;
import
org.apache.commons.cli.Options
;
import
org.apache.commons.cli.PosixParser
;
import
org.apache.commons.collections.CollectionUtils
;
import
org.apache.rocketmq.acl.common.AclClientRPCHook
;
import
org.apache.rocketmq.acl.common.SessionCredentials
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.apache.rocketmq.client.producer.SendStatus
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
import
org.apache.rocketmq.srvutil.ServerUtil
;
public
class
BatchProducer
{
public
static
void
main
(
String
[]
args
)
throws
MQClientException
,
UnsupportedEncodingException
{
Options
options
=
ServerUtil
.
buildCommandlineOptions
(
new
Options
());
CommandLine
commandLine
=
ServerUtil
.
parseCmdLine
(
"benchmarkBatchProducer"
,
args
,
buildCommandlineOptions
(
options
),
new
PosixParser
());
if
(
null
==
commandLine
)
{
System
.
exit
(-
1
);
}
final
String
namesrv
=
getOptionValue
(
commandLine
,
'n'
,
"127.0.0.1:9876"
);
final
String
topic
=
getOptionValue
(
commandLine
,
't'
,
"BenchmarkTest"
);
final
int
threadCount
=
getOptionValue
(
commandLine
,
'w'
,
64
);
final
int
messageSize
=
getOptionValue
(
commandLine
,
's'
,
128
);
final
int
batchSize
=
getOptionValue
(
commandLine
,
'b'
,
16
);
final
boolean
keyEnable
=
getOptionValue
(
commandLine
,
'k'
,
false
);
final
int
propertySize
=
getOptionValue
(
commandLine
,
'p'
,
0
);
final
int
tagCount
=
getOptionValue
(
commandLine
,
'l'
,
0
);
final
boolean
msgTraceEnable
=
getOptionValue
(
commandLine
,
'm'
,
false
);
final
boolean
aclEnable
=
getOptionValue
(
commandLine
,
'a'
,
false
);
final
String
ak
=
getOptionValue
(
commandLine
,
'c'
,
"rocketmq2"
);
final
String
sk
=
getOptionValue
(
commandLine
,
'e'
,
"12346789"
);
System
.
out
.
printf
(
"topic: %s threadCount: %d messageSize: %d batchSize: %d keyEnable: %s propertySize: %d tagCount: %d traceEnable: %s aclEnable: %s%n"
,
topic
,
threadCount
,
messageSize
,
batchSize
,
keyEnable
,
propertySize
,
tagCount
,
msgTraceEnable
,
aclEnable
);
final
StatsBenchmarkBatchProducer
statsBenchmark
=
new
StatsBenchmarkBatchProducer
();
statsBenchmark
.
start
();
final
DefaultMQProducer
producer
=
initInstance
(
namesrv
,
msgTraceEnable
,
aclEnable
,
ak
,
sk
);
producer
.
start
();
final
InternalLogger
log
=
ClientLogger
.
getLog
();
final
ExecutorService
sendThreadPool
=
Executors
.
newFixedThreadPool
(
threadCount
);
for
(
int
i
=
0
;
i
<
threadCount
;
i
++)
{
sendThreadPool
.
execute
(
new
Runnable
()
{
@Override
public
void
run
()
{
while
(
true
)
{
List
<
Message
>
msgs
;
try
{
msgs
=
buildBathMessage
(
batchSize
,
messageSize
,
topic
);
}
catch
(
UnsupportedEncodingException
e
)
{
e
.
printStackTrace
();
return
;
}
if
(
CollectionUtils
.
isEmpty
(
msgs
))
{
return
;
}
try
{
long
beginTimestamp
=
System
.
currentTimeMillis
();
long
sendSucCount
=
statsBenchmark
.
getSendMessageSuccessCount
().
get
();
setKeys
(
keyEnable
,
msgs
,
String
.
valueOf
(
beginTimestamp
/
1000
));
setTags
(
tagCount
,
msgs
,
sendSucCount
);
setProperties
(
propertySize
,
msgs
);
SendResult
sendResult
=
producer
.
send
(
msgs
);
if
(
sendResult
.
getSendStatus
()
==
SendStatus
.
SEND_OK
)
{
statsBenchmark
.
getSendRequestSuccessCount
().
incrementAndGet
();
statsBenchmark
.
getSendMessageSuccessCount
().
addAndGet
(
msgs
.
size
());
}
else
{
statsBenchmark
.
getSendRequestFailedCount
().
incrementAndGet
();
statsBenchmark
.
getSendMessageFailedCount
().
addAndGet
(
msgs
.
size
());
}
long
currentRT
=
System
.
currentTimeMillis
()
-
beginTimestamp
;
statsBenchmark
.
getSendMessageSuccessTimeTotal
().
addAndGet
(
currentRT
);
long
prevMaxRT
=
statsBenchmark
.
getSendMessageMaxRT
().
get
();
while
(
currentRT
>
prevMaxRT
)
{
boolean
updated
=
statsBenchmark
.
getSendMessageMaxRT
().
compareAndSet
(
prevMaxRT
,
currentRT
);
if
(
updated
)
{
break
;
}
prevMaxRT
=
statsBenchmark
.
getSendMessageMaxRT
().
get
();
}
}
catch
(
RemotingException
e
)
{
statsBenchmark
.
getSendRequestFailedCount
().
incrementAndGet
();
statsBenchmark
.
getSendMessageFailedCount
().
addAndGet
(
msgs
.
size
());
log
.
error
(
"[BENCHMARK_PRODUCER] Send Exception"
,
e
);
try
{
Thread
.
sleep
(
3000
);
}
catch
(
InterruptedException
ignored
)
{
}
}
catch
(
InterruptedException
e
)
{
statsBenchmark
.
getSendRequestFailedCount
().
incrementAndGet
();
statsBenchmark
.
getSendMessageFailedCount
().
addAndGet
(
msgs
.
size
());
try
{
Thread
.
sleep
(
3000
);
}
catch
(
InterruptedException
e1
)
{
}
statsBenchmark
.
getSendRequestFailedCount
().
incrementAndGet
();
statsBenchmark
.
getSendMessageFailedCount
().
addAndGet
(
msgs
.
size
());
log
.
error
(
"[BENCHMARK_PRODUCER] Send Exception"
,
e
);
}
catch
(
MQClientException
e
)
{
statsBenchmark
.
getSendRequestFailedCount
().
incrementAndGet
();
statsBenchmark
.
getSendMessageFailedCount
().
addAndGet
(
msgs
.
size
());
log
.
error
(
"[BENCHMARK_PRODUCER] Send Exception"
,
e
);
}
catch
(
MQBrokerException
e
)
{
statsBenchmark
.
getSendRequestFailedCount
().
incrementAndGet
();
statsBenchmark
.
getSendMessageFailedCount
().
addAndGet
(
msgs
.
size
());
log
.
error
(
"[BENCHMARK_PRODUCER] Send Exception"
,
e
);
try
{
Thread
.
sleep
(
3000
);
}
catch
(
InterruptedException
ignored
)
{
}
}
}
}
});
}
}
public
static
Options
buildCommandlineOptions
(
final
Options
options
)
{
Option
opt
=
new
Option
(
"w"
,
"threadCount"
,
true
,
"Thread count, Default: 64"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"s"
,
"messageSize"
,
true
,
"Message Size, Default: 128"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"b"
,
"batchSize"
,
true
,
"Batch Size, Default: 16"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"k"
,
"keyEnable"
,
true
,
"Message Key Enable, Default: false"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"t"
,
"topic"
,
true
,
"Topic name, Default: BenchmarkTest"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"l"
,
"tagCount"
,
true
,
"Tag count, Default: 0"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"m"
,
"msgTraceEnable"
,
true
,
"Message Trace Enable, Default: false"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"a"
,
"aclEnable"
,
true
,
"Acl Enable, Default: false"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"c"
,
"accessKey"
,
true
,
"Acl Access Key, Default: rocketmq2"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"e"
,
"secretKey"
,
true
,
"Acl Secret Key, Default: 123456789"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"p"
,
"propertySize"
,
true
,
"Property Size, Default: 0"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"n"
,
"namesrv"
,
true
,
"name server, Default: 127.0.0.1:9876"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
return
options
;
}
private
static
String
getOptionValue
(
CommandLine
commandLine
,
char
key
,
String
defaultValue
)
{
if
(
commandLine
.
hasOption
(
key
))
{
return
commandLine
.
getOptionValue
(
key
).
trim
();
}
return
defaultValue
;
}
private
static
int
getOptionValue
(
CommandLine
commandLine
,
char
key
,
int
defaultValue
)
{
if
(
commandLine
.
hasOption
(
key
))
{
return
Integer
.
parseInt
(
commandLine
.
getOptionValue
(
key
).
trim
());
}
return
defaultValue
;
}
private
static
boolean
getOptionValue
(
CommandLine
commandLine
,
char
key
,
boolean
defaultValue
)
{
if
(
commandLine
.
hasOption
(
key
))
{
return
Boolean
.
parseBoolean
(
commandLine
.
getOptionValue
(
key
).
trim
());
}
return
defaultValue
;
}
private
static
List
<
Message
>
buildBathMessage
(
int
batchSize
,
int
messageSize
,
String
topic
)
throws
UnsupportedEncodingException
{
List
<
Message
>
batchMessage
=
new
ArrayList
<>(
batchSize
);
for
(
int
i
=
0
;
i
<
batchSize
;
i
++)
{
Message
msg
=
new
Message
();
msg
.
setTopic
(
topic
);
StringBuilder
sb
=
new
StringBuilder
();
for
(
int
j
=
0
;
j
<
messageSize
;
j
+=
10
)
{
sb
.
append
(
"hello baby"
);
}
msg
.
setBody
(
sb
.
toString
().
getBytes
(
RemotingHelper
.
DEFAULT_CHARSET
));
batchMessage
.
add
(
msg
);
}
return
batchMessage
;
}
private
static
void
setKeys
(
boolean
keyEnable
,
List
<
Message
>
msgs
,
String
keys
)
{
if
(!
keyEnable
)
{
return
;
}
for
(
Message
msg
:
msgs
)
{
msg
.
setKeys
(
keys
);
}
}
private
static
void
setTags
(
int
tagCount
,
List
<
Message
>
msgs
,
long
startTagId
)
{
if
(
tagCount
<=
0
)
{
return
;
}
long
tagId
=
startTagId
%
tagCount
;
for
(
Message
msg
:
msgs
)
{
msg
.
setTags
(
String
.
format
(
"tag%d"
,
tagId
++));
}
}
private
static
void
setProperties
(
int
propertySize
,
List
<
Message
>
msgs
)
{
if
(
propertySize
<=
0
)
{
return
;
}
for
(
Message
msg
:
msgs
)
{
if
(
msg
.
getProperties
()
!=
null
)
{
msg
.
getProperties
().
clear
();
}
int
startValue
=
(
new
Random
(
System
.
currentTimeMillis
())).
nextInt
(
100
);
int
size
=
0
;
for
(
int
i
=
0
;
;
i
++)
{
String
prop1
=
"prop"
+
i
,
prop1V
=
"hello"
+
startValue
;
msg
.
putUserProperty
(
prop1
,
prop1V
);
size
+=
prop1
.
length
()
+
prop1V
.
length
();
if
(
size
>
propertySize
)
{
break
;
}
startValue
++;
}
}
}
private
static
DefaultMQProducer
initInstance
(
String
namesrv
,
boolean
traceEnable
,
boolean
aclEnable
,
String
ak
,
String
sk
)
{
RPCHook
rpcHook
=
aclEnable
?
new
AclClientRPCHook
(
new
SessionCredentials
(
ak
,
sk
))
:
null
;
final
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"benchmark_batch_producer"
,
rpcHook
,
traceEnable
,
null
);
producer
.
setInstanceName
(
Long
.
toString
(
System
.
currentTimeMillis
()));
producer
.
setNamesrvAddr
(
namesrv
);
producer
.
setCompressMsgBodyOverHowmuch
(
Integer
.
MAX_VALUE
);
return
producer
;
}
}
class
StatsBenchmarkBatchProducer
{
private
final
AtomicLong
sendRequestSuccessCount
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
sendRequestFailedCount
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
sendMessageSuccessTimeTotal
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
sendMessageMaxRT
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
sendMessageSuccessCount
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
sendMessageFailedCount
=
new
AtomicLong
(
0L
);
private
final
Timer
timer
=
new
Timer
(
"BenchmarkTimerThread"
,
true
);
private
final
LinkedList
<
Long
[]>
snapshotList
=
new
LinkedList
<>();
public
Long
[]
createSnapshot
()
{
Long
[]
snap
=
new
Long
[]
{
System
.
currentTimeMillis
(),
this
.
sendRequestSuccessCount
.
get
(),
this
.
sendRequestFailedCount
.
get
(),
this
.
sendMessageSuccessCount
.
get
(),
this
.
sendMessageFailedCount
.
get
(),
this
.
sendMessageSuccessTimeTotal
.
get
(),
};
return
snap
;
}
public
AtomicLong
getSendRequestSuccessCount
()
{
return
sendRequestSuccessCount
;
}
public
AtomicLong
getSendRequestFailedCount
()
{
return
sendRequestFailedCount
;
}
public
AtomicLong
getSendMessageSuccessTimeTotal
()
{
return
sendMessageSuccessTimeTotal
;
}
public
AtomicLong
getSendMessageMaxRT
()
{
return
sendMessageMaxRT
;
}
public
AtomicLong
getSendMessageSuccessCount
()
{
return
sendMessageSuccessCount
;
}
public
AtomicLong
getSendMessageFailedCount
()
{
return
sendMessageFailedCount
;
}
public
void
start
()
{
timer
.
scheduleAtFixedRate
(
new
TimerTask
()
{
@Override
public
void
run
()
{
snapshotList
.
addLast
(
createSnapshot
());
if
(
snapshotList
.
size
()
>
10
)
{
snapshotList
.
removeFirst
();
}
}
},
1000
,
1000
);
timer
.
scheduleAtFixedRate
(
new
TimerTask
()
{
private
void
printStats
()
{
if
(
snapshotList
.
size
()
>=
10
)
{
Long
[]
begin
=
snapshotList
.
getFirst
();
Long
[]
end
=
snapshotList
.
getLast
();
final
long
sendTps
=
(
long
)
(((
end
[
1
]
-
begin
[
1
])
/
(
double
)
(
end
[
0
]
-
begin
[
0
]))
*
1000L
);
final
long
sendMps
=
(
long
)
(((
end
[
3
]
-
begin
[
3
])
/
(
double
)
(
end
[
0
]
-
begin
[
0
]))
*
1000L
);
final
double
averageRT
=
(
end
[
5
]
-
begin
[
5
])
/
(
double
)
(
end
[
1
]
-
begin
[
1
]);
final
double
averageMsgRT
=
(
end
[
5
]
-
begin
[
5
])
/
(
double
)
(
end
[
3
]
-
begin
[
3
]);
System
.
out
.
printf
(
"Current Time: %s Send TPS: %d Send MPS: %d Max RT(ms): %d Average RT(ms): %7.3f Average Message RT(ms): %7.3f Send Failed: %d Send Message Failed: %d%n"
,
System
.
currentTimeMillis
(),
sendTps
,
sendMps
,
getSendMessageMaxRT
().
get
(),
averageRT
,
averageMsgRT
,
end
[
2
],
end
[
4
]);
}
}
@Override
public
void
run
()
{
try
{
this
.
printStats
();
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
}
}
},
10000
,
10000
);
}
public
void
shutdown
()
{
timer
.
cancel
();
}
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录