Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
34c34549
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
268
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
34c34549
编写于
12月 06, 2019
作者:
H
huangli
提交者:
Heng Du
12月 06, 2019
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Optimise tx benchmark producer (#1628)
上级
44569e4d
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
266 addition
and
101 deletion
+266
-101
example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
...pache/rocketmq/example/benchmark/TransactionProducer.java
+264
-100
srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
...src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
+2
-1
未找到文件。
example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
浏览文件 @
34c34549
...
@@ -18,11 +18,19 @@
...
@@ -18,11 +18,19 @@
package
org.apache.rocketmq.example.benchmark
;
package
org.apache.rocketmq.example.benchmark
;
import
java.io.UnsupportedEncodingException
;
import
java.io.UnsupportedEncodingException
;
import
java.nio.ByteBuffer
;
import
java.text.SimpleDateFormat
;
import
java.util.ArrayList
;
import
java.util.Date
;
import
java.util.LinkedHashMap
;
import
java.util.LinkedList
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Timer
;
import
java.util.Timer
;
import
java.util.TimerTask
;
import
java.util.TimerTask
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ThreadLocalRandom
;
import
java.util.concurrent.atomic.AtomicLong
;
import
java.util.concurrent.atomic.AtomicLong
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.CommandLine
;
...
@@ -30,35 +38,44 @@ import org.apache.commons.cli.Option;
...
@@ -30,35 +38,44 @@ import org.apache.commons.cli.Option;
import
org.apache.commons.cli.Options
;
import
org.apache.commons.cli.Options
;
import
org.apache.commons.cli.PosixParser
;
import
org.apache.commons.cli.PosixParser
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.producer.LocalTransactionExecuter
;
import
org.apache.rocketmq.client.producer.LocalTransactionState
;
import
org.apache.rocketmq.client.producer.LocalTransactionState
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.apache.rocketmq.client.producer.TransactionCheckListener
;
import
org.apache.rocketmq.client.producer.SendStatus
;
import
org.apache.rocketmq.client.producer.TransactionListener
;
import
org.apache.rocketmq.client.producer.TransactionMQProducer
;
import
org.apache.rocketmq.client.producer.TransactionMQProducer
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
import
org.apache.rocketmq.srvutil.ServerUtil
;
import
org.apache.rocketmq.srvutil.ServerUtil
;
public
class
TransactionProducer
{
public
class
TransactionProducer
{
private
static
final
long
START_TIME
=
System
.
currentTimeMillis
();
private
static
final
AtomicLong
MSG_COUNT
=
new
AtomicLong
(
0
);
//broker max check times should less than this value
static
final
int
MAX_CHECK_RESULT_IN_MSG
=
20
;
public
static
void
main
(
String
[]
args
)
throws
MQClientException
,
UnsupportedEncodingException
{
public
static
void
main
(
String
[]
args
)
throws
MQClientException
,
UnsupportedEncodingException
{
Options
options
=
ServerUtil
.
buildCommandlineOptions
(
new
Options
());
Options
options
=
ServerUtil
.
buildCommandlineOptions
(
new
Options
());
CommandLine
commandLine
=
ServerUtil
.
parseCmdLine
(
"TransactionProducer"
,
args
,
buildCommandlineOptions
(
options
),
new
PosixParser
());
CommandLine
commandLine
=
ServerUtil
.
parseCmdLine
(
"TransactionProducer"
,
args
,
buildCommandlineOptions
(
options
),
new
PosixParser
());
TxSendConfig
config
=
new
TxSendConfig
();
final
String
topic
=
commandLine
.
hasOption
(
't'
)
?
commandLine
.
getOptionValue
(
't'
).
trim
()
:
"BenchmarkTest"
;
config
.
topic
=
commandLine
.
hasOption
(
't'
)
?
commandLine
.
getOptionValue
(
't'
).
trim
()
:
"BenchmarkTest"
;
final
int
threadCount
=
commandLine
.
hasOption
(
'w'
)
?
Integer
.
parseInt
(
commandLine
.
getOptionValue
(
'w'
))
:
32
;
config
.
threadCount
=
commandLine
.
hasOption
(
'w'
)
?
Integer
.
parseInt
(
commandLine
.
getOptionValue
(
'w'
))
:
32
;
final
int
messageSize
=
commandLine
.
hasOption
(
's'
)
?
Integer
.
parseInt
(
commandLine
.
getOptionValue
(
's'
))
:
2048
;
config
.
messageSize
=
commandLine
.
hasOption
(
's'
)
?
Integer
.
parseInt
(
commandLine
.
getOptionValue
(
's'
))
:
2048
;
final
boolean
ischeck
=
commandLine
.
hasOption
(
'c'
)
?
Boolean
.
parseBoolean
(
commandLine
.
getOptionValue
(
'c'
))
:
false
;
config
.
sendRollbackRate
=
commandLine
.
hasOption
(
"sr"
)
?
Double
.
parseDouble
(
commandLine
.
getOptionValue
(
"sr"
))
:
0.0
;
final
boolean
ischeckffalse
=
commandLine
.
hasOption
(
'r'
)
?
Boolean
.
parseBoolean
(
commandLine
.
getOptionValue
(
'r'
))
:
true
;
config
.
sendUnknownRate
=
commandLine
.
hasOption
(
"su"
)
?
Double
.
parseDouble
(
commandLine
.
getOptionValue
(
"su"
))
:
0.0
;
config
.
checkRollbackRate
=
commandLine
.
hasOption
(
"cr"
)
?
Double
.
parseDouble
(
commandLine
.
getOptionValue
(
"cr"
))
:
0.0
;
final
ExecutorService
sendThreadPool
=
Executors
.
newFixedThreadPool
(
threadCount
);
config
.
checkUnknownRate
=
commandLine
.
hasOption
(
"cu"
)
?
Double
.
parseDouble
(
commandLine
.
getOptionValue
(
"cu"
))
:
0.0
;
config
.
batchId
=
commandLine
.
hasOption
(
"b"
)
?
Long
.
parseLong
(
commandLine
.
getOptionValue
(
"b"
))
:
System
.
currentTimeMillis
();
config
.
sendInterval
=
commandLine
.
hasOption
(
"i"
)
?
Integer
.
parseInt
(
commandLine
.
getOptionValue
(
"i"
))
:
0
;
final
ExecutorService
sendThreadPool
=
Executors
.
newFixedThreadPool
(
config
.
threadCount
);
final
StatsBenchmarkTProducer
statsBenchmark
=
new
StatsBenchmarkTProducer
();
final
StatsBenchmarkTProducer
statsBenchmark
=
new
StatsBenchmarkTProducer
();
final
Timer
timer
=
new
Timer
(
"BenchmarkTimerThread"
,
true
);
final
Timer
timer
=
new
Timer
(
"BenchmarkTimerThread"
,
true
);
final
LinkedList
<
Long
[]>
snapshotList
=
new
LinkedList
<
Long
[]
>();
final
LinkedList
<
Snapshot
>
snapshotList
=
new
LinkedList
<
>();
timer
.
scheduleAtFixedRate
(
new
TimerTask
()
{
timer
.
scheduleAtFixedRate
(
new
TimerTask
()
{
@Override
@Override
...
@@ -73,16 +90,24 @@ public class TransactionProducer {
...
@@ -73,16 +90,24 @@ public class TransactionProducer {
timer
.
scheduleAtFixedRate
(
new
TimerTask
()
{
timer
.
scheduleAtFixedRate
(
new
TimerTask
()
{
private
void
printStats
()
{
private
void
printStats
()
{
if
(
snapshotList
.
size
()
>=
10
)
{
if
(
snapshotList
.
size
()
>=
10
)
{
Long
[]
begin
=
snapshotList
.
getFirst
();
Snapshot
begin
=
snapshotList
.
getFirst
();
Long
[]
end
=
snapshotList
.
getLast
();
Snapshot
end
=
snapshotList
.
getLast
();
final
long
sendTps
=
final
long
sendCount
=
(
end
.
sendRequestSuccessCount
-
begin
.
sendRequestSuccessCount
)
(
long
)
(((
end
[
3
]
-
begin
[
3
])
/
(
double
)
(
end
[
0
]
-
begin
[
0
]))
*
1000L
);
+
(
end
.
sendRequestFailedCount
-
begin
.
sendRequestFailedCount
);
final
double
averageRT
=
(
end
[
5
]
-
begin
[
5
])
/
(
double
)
(
end
[
3
]
-
begin
[
3
]);
final
long
sendTps
=
(
sendCount
*
1000L
)
/
(
end
.
endTime
-
begin
.
endTime
);
final
double
averageRT
=
(
end
.
sendMessageTimeTotal
-
begin
.
sendMessageTimeTotal
)
/
(
double
)
(
end
.
sendRequestSuccessCount
-
begin
.
sendRequestSuccessCount
);
final
long
failCount
=
end
.
sendRequestFailedCount
-
begin
.
sendRequestFailedCount
;
final
long
checkCount
=
end
.
checkCount
-
begin
.
checkCount
;
final
long
unexpectedCheck
=
end
.
unexpectedCheckCount
-
begin
.
unexpectedCheckCount
;
final
long
dupCheck
=
end
.
duplicatedCheck
-
begin
.
duplicatedCheck
;
System
.
out
.
printf
(
System
.
out
.
printf
(
"Send TPS: %d Max RT: %d Average RT: %7.3f Send Failed: %d Response Failed: %d transaction checkCount: %d %n"
,
"Send TPS:%5d Max RT:%5d AVG RT:%3.1f Send Failed: %d check: %d unexpectedCheck: %d duplicatedCheck: %d %n"
,
sendTps
,
statsBenchmark
.
getSendMessageMaxRT
().
get
(),
averageRT
,
end
[
2
],
end
[
4
],
end
[
6
]);
sendTps
,
statsBenchmark
.
getSendMessageMaxRT
().
get
(),
averageRT
,
failCount
,
checkCount
,
unexpectedCheck
,
dupCheck
);
statsBenchmark
.
getSendMessageMaxRT
().
set
(
0
);
}
}
}
}
...
@@ -96,11 +121,10 @@ public class TransactionProducer {
...
@@ -96,11 +121,10 @@ public class TransactionProducer {
}
}
},
10000
,
10000
);
},
10000
,
10000
);
final
TransactionCheckListener
transactionCheckListener
=
final
TransactionListener
transactionCheckListener
=
new
TransactionListenerImpl
(
statsBenchmark
,
config
);
new
TransactionCheckListenerBImpl
(
ischeckffalse
,
statsBenchmark
);
final
TransactionMQProducer
producer
=
new
TransactionMQProducer
(
"benchmark_transaction_producer"
);
final
TransactionMQProducer
producer
=
new
TransactionMQProducer
(
"benchmark_transaction_producer"
);
producer
.
setInstanceName
(
Long
.
toString
(
System
.
currentTimeMillis
()));
producer
.
setInstanceName
(
Long
.
toString
(
System
.
currentTimeMillis
()));
producer
.
setTransaction
Check
Listener
(
transactionCheckListener
);
producer
.
setTransactionListener
(
transactionCheckListener
);
producer
.
setDefaultTopicQueueNums
(
1000
);
producer
.
setDefaultTopicQueueNums
(
1000
);
if
(
commandLine
.
hasOption
(
'n'
))
{
if
(
commandLine
.
hasOption
(
'n'
))
{
String
ns
=
commandLine
.
getOptionValue
(
'n'
);
String
ns
=
commandLine
.
getOptionValue
(
'n'
);
...
@@ -108,60 +132,87 @@ public class TransactionProducer {
...
@@ -108,60 +132,87 @@ public class TransactionProducer {
}
}
producer
.
start
();
producer
.
start
();
final
TransactionExecuterBImpl
tranExecuter
=
new
TransactionExecuterBImpl
(
ischeck
);
for
(
int
i
=
0
;
i
<
config
.
threadCount
;
i
++)
{
for
(
int
i
=
0
;
i
<
threadCount
;
i
++)
{
sendThreadPool
.
execute
(
new
Runnable
()
{
sendThreadPool
.
execute
(
new
Runnable
()
{
@Override
@Override
public
void
run
()
{
public
void
run
()
{
while
(
true
)
{
while
(
true
)
{
try
{
boolean
success
=
false
;
// Thread.sleep(1000);
final
long
beginTimestamp
=
System
.
currentTimeMillis
();
final
long
beginTimestamp
=
System
.
currentTimeMillis
();
try
{
SendResult
sendResult
=
SendResult
sendResult
=
producer
.
sendMessageInTransaction
(
buildMessage
(
messageSize
,
topic
),
tranExecuter
,
null
);
producer
.
sendMessageInTransaction
(
buildMessage
(
config
),
null
);
if
(
sendResult
!=
null
)
{
success
=
sendResult
!=
null
&&
sendResult
.
getSendStatus
()
==
SendStatus
.
SEND_OK
;
statsBenchmark
.
getSendRequestSuccessCount
().
incrementAndGet
();
}
catch
(
Throwable
e
)
{
statsBenchmark
.
getReceiveResponseSuccessCount
().
incrementAndGet
();
success
=
false
;
}
}
finally
{
final
long
currentRT
=
System
.
currentTimeMillis
()
-
beginTimestamp
;
final
long
currentRT
=
System
.
currentTimeMillis
()
-
beginTimestamp
;
statsBenchmark
.
getSendMessage
Success
TimeTotal
().
addAndGet
(
currentRT
);
statsBenchmark
.
getSendMessageTimeTotal
().
addAndGet
(
currentRT
);
long
prevMaxRT
=
statsBenchmark
.
getSendMessageMaxRT
().
get
();
long
prevMaxRT
=
statsBenchmark
.
getSendMessageMaxRT
().
get
();
while
(
currentRT
>
prevMaxRT
)
{
while
(
currentRT
>
prevMaxRT
)
{
boolean
updated
=
boolean
updated
=
statsBenchmark
.
getSendMessageMaxRT
()
statsBenchmark
.
getSendMessageMaxRT
().
compareAndSet
(
prevMaxRT
,
.
compareAndSet
(
prevMaxRT
,
currentRT
);
currentRT
);
if
(
updated
)
if
(
updated
)
break
;
break
;
prevMaxRT
=
statsBenchmark
.
getSendMessageMaxRT
().
get
();
prevMaxRT
=
statsBenchmark
.
getSendMessageMaxRT
().
get
();
}
}
}
catch
(
MQClientException
e
)
{
if
(
success
)
{
statsBenchmark
.
getSendRequestSuccessCount
().
incrementAndGet
();
}
else
{
statsBenchmark
.
getSendRequestFailedCount
().
incrementAndGet
();
statsBenchmark
.
getSendRequestFailedCount
().
incrementAndGet
();
}
}
if
(
config
.
sendInterval
>
0
)
{
try
{
Thread
.
sleep
(
config
.
sendInterval
);
}
catch
(
InterruptedException
e
)
{
}
}
}
}
}
}
}
});
});
}
}
}
}
private
static
Message
buildMessage
(
final
int
messageSize
,
String
topic
)
{
private
static
Message
buildMessage
(
TxSendConfig
config
)
{
try
{
byte
[]
bs
=
new
byte
[
config
.
messageSize
];
Message
msg
=
new
Message
();
ThreadLocalRandom
r
=
ThreadLocalRandom
.
current
();
msg
.
setTopic
(
topic
);
r
.
nextBytes
(
bs
);
ByteBuffer
buf
=
ByteBuffer
.
wrap
(
bs
);
buf
.
putLong
(
config
.
batchId
);
long
sendMachineId
=
START_TIME
<<
32
;
long
msgId
=
sendMachineId
|
MSG_COUNT
.
getAndIncrement
();
buf
.
putLong
(
msgId
);
StringBuilder
sb
=
new
StringBuilder
();
// save send tx result in message
for
(
int
i
=
0
;
i
<
messageSize
;
i
+=
10
)
{
if
(
r
.
nextDouble
()
<
config
.
sendRollbackRate
)
{
sb
.
append
(
"hello baby"
);
buf
.
put
((
byte
)
LocalTransactionState
.
ROLLBACK_MESSAGE
.
ordinal
());
}
else
if
(
r
.
nextDouble
()
<
config
.
sendUnknownRate
)
{
buf
.
put
((
byte
)
LocalTransactionState
.
UNKNOW
.
ordinal
());
}
else
{
buf
.
put
((
byte
)
LocalTransactionState
.
COMMIT_MESSAGE
.
ordinal
());
}
}
msg
.
setBody
(
sb
.
toString
().
getBytes
(
RemotingHelper
.
DEFAULT_CHARSET
));
return
msg
;
// save check tx result in message
}
catch
(
UnsupportedEncodingException
e
)
{
for
(
int
i
=
0
;
i
<
MAX_CHECK_RESULT_IN_MSG
;
i
++)
{
throw
new
RuntimeException
(
e
);
if
(
r
.
nextDouble
()
<
config
.
checkRollbackRate
)
{
buf
.
put
((
byte
)
LocalTransactionState
.
ROLLBACK_MESSAGE
.
ordinal
());
}
else
if
(
r
.
nextDouble
()
<
config
.
checkUnknownRate
)
{
buf
.
put
((
byte
)
LocalTransactionState
.
UNKNOW
.
ordinal
());
}
else
{
buf
.
put
((
byte
)
LocalTransactionState
.
COMMIT_MESSAGE
.
ordinal
());
}
}
}
}
Message
msg
=
new
Message
();
msg
.
setTopic
(
config
.
topic
);
msg
.
setBody
(
bs
);
return
msg
;
}
public
static
Options
buildCommandlineOptions
(
final
Options
options
)
{
public
static
Options
buildCommandlineOptions
(
final
Options
options
)
{
Option
opt
=
new
Option
(
"w"
,
"threadCount"
,
true
,
"Thread count, Default: 32"
);
Option
opt
=
new
Option
(
"w"
,
"threadCount"
,
true
,
"Thread count, Default: 32"
);
opt
.
setRequired
(
false
);
opt
.
setRequired
(
false
);
...
@@ -175,84 +226,171 @@ public class TransactionProducer {
...
@@ -175,84 +226,171 @@ public class TransactionProducer {
opt
.
setRequired
(
false
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"
c"
,
"check"
,
true
,
"Check the message, Default: false
"
);
opt
=
new
Option
(
"
sr"
,
"send rollback rate"
,
true
,
"Send rollback rate, Default: 0.0
"
);
opt
.
setRequired
(
false
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"
r"
,
"checkResult"
,
true
,
"Message check result, Default: true
"
);
opt
=
new
Option
(
"
su"
,
"send unknown rate"
,
true
,
"Send unknown rate, Default: 0.0
"
);
opt
.
setRequired
(
false
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"cr"
,
"check rollback rate"
,
true
,
"Check rollback rate, Default: 0.0"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"cu"
,
"check unknown rate"
,
true
,
"Check unknown rate, Default: 0.0"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"b"
,
"test batch id"
,
true
,
"test batch id, Default: System.currentMillis()"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
opt
=
new
Option
(
"i"
,
"send interval"
,
true
,
"sleep interval in millis between messages, Default: 0"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
return
options
;
return
options
;
}
}
}
}
class
TransactionExecuterBImpl
implements
LocalTransactionExecuter
{
class
TransactionListenerImpl
implements
TransactionListener
{
private
StatsBenchmarkTProducer
statBenchmark
;
private
TxSendConfig
sendConfig
;
private
final
LRUMap
<
Long
,
Integer
>
cache
=
new
LRUMap
<>(
200000
);
private
boolean
ischeck
;
private
class
MsgMeta
{
long
batchId
;
long
msgId
;
LocalTransactionState
sendResult
;
List
<
LocalTransactionState
>
checkResult
;
}
public
TransactionExecuterBImpl
(
boolean
ischeck
)
{
public
TransactionListenerImpl
(
StatsBenchmarkTProducer
statsBenchmark
,
TxSendConfig
sendConfig
)
{
this
.
ischeck
=
ischeck
;
this
.
statBenchmark
=
statsBenchmark
;
this
.
sendConfig
=
sendConfig
;
}
}
@Override
@Override
public
LocalTransactionState
executeLocalTransactionBranch
(
final
Message
msg
,
final
Object
arg
)
{
public
LocalTransactionState
executeLocalTransaction
(
Message
msg
,
Object
arg
)
{
if
(
ischeck
)
{
return
parseFromMsg
(
msg
).
sendResult
;
return
LocalTransactionState
.
UNKNOW
;
}
}
return
LocalTransactionState
.
COMMIT_MESSAGE
;
}
}
class
TransactionCheckListenerBImpl
implements
TransactionCheckListener
{
private
MsgMeta
parseFromMsg
(
Message
msg
)
{
private
boolean
ischeckffalse
;
byte
[]
bs
=
msg
.
getBody
();
private
StatsBenchmarkTProducer
statsBenchmarkTProducer
;
ByteBuffer
buf
=
ByteBuffer
.
wrap
(
bs
);
MsgMeta
msgMeta
=
new
MsgMeta
();
public
TransactionCheckListenerBImpl
(
boolean
ischeckffalse
,
msgMeta
.
batchId
=
buf
.
getLong
();
StatsBenchmarkTProducer
statsBenchmarkTProducer
)
{
msgMeta
.
msgId
=
buf
.
getLong
();
this
.
ischeckffalse
=
ischeckffalse
;
msgMeta
.
sendResult
=
LocalTransactionState
.
values
()[
buf
.
get
()];
this
.
statsBenchmarkTProducer
=
statsBenchmarkTProducer
;
msgMeta
.
checkResult
=
new
ArrayList
<>();
for
(
int
i
=
0
;
i
<
TransactionProducer
.
MAX_CHECK_RESULT_IN_MSG
;
i
++)
{
msgMeta
.
checkResult
.
add
(
LocalTransactionState
.
values
()[
buf
.
get
()]);
}
return
msgMeta
;
}
}
@Override
@Override
public
LocalTransactionState
checkLocalTransactionState
(
MessageExt
msg
)
{
public
LocalTransactionState
checkLocalTransaction
(
MessageExt
msg
)
{
statsBenchmarkTProducer
.
getCheckRequestSuccessCount
().
incrementAndGet
();
MsgMeta
msgMeta
=
parseFromMsg
(
msg
);
if
(
ischeckffalse
)
{
if
(
msgMeta
.
batchId
!=
sendConfig
.
batchId
)
{
// message not generated in this test
return
LocalTransactionState
.
ROLLBACK_MESSAGE
;
}
statBenchmark
.
getCheckCount
().
incrementAndGet
();
int
times
=
0
;
try
{
String
checkTimes
=
msg
.
getUserProperty
(
MessageConst
.
PROPERTY_TRANSACTION_CHECK_TIMES
);
times
=
Integer
.
parseInt
(
checkTimes
);
}
catch
(
Exception
e
)
{
return
LocalTransactionState
.
ROLLBACK_MESSAGE
;
return
LocalTransactionState
.
ROLLBACK_MESSAGE
;
}
}
times
=
times
<=
0
?
1
:
times
;
boolean
dup
;
synchronized
(
cache
)
{
Integer
oldCheckLog
=
cache
.
get
(
msgMeta
.
msgId
);
Integer
newCheckLog
;
if
(
oldCheckLog
==
null
)
{
newCheckLog
=
1
<<
(
times
-
1
);
}
else
{
newCheckLog
=
oldCheckLog
|
(
1
<<
(
times
-
1
));
}
dup
=
newCheckLog
.
equals
(
oldCheckLog
);
}
if
(
dup
)
{
statBenchmark
.
getDuplicatedCheckCount
().
incrementAndGet
();
}
if
(
msgMeta
.
sendResult
!=
LocalTransactionState
.
UNKNOW
)
{
System
.
out
.
printf
(
"%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult=%s\n"
,
new
SimpleDateFormat
(
"HH:mm:ss,SSS"
).
format
(
new
Date
()),
msg
.
getMsgId
(),
msg
.
getTransactionId
(),
msg
.
getUserProperty
(
MessageConst
.
PROPERTY_TRANSACTION_CHECK_TIMES
),
msgMeta
.
sendResult
.
toString
());
statBenchmark
.
getUnexpectedCheckCount
().
incrementAndGet
();
return
msgMeta
.
sendResult
;
}
return
LocalTransactionState
.
COMMIT_MESSAGE
;
for
(
int
i
=
0
;
i
<
times
-
1
;
i
++)
{
LocalTransactionState
s
=
msgMeta
.
checkResult
.
get
(
i
);
if
(
s
!=
LocalTransactionState
.
UNKNOW
)
{
System
.
out
.
printf
(
"%s unexpected check: msgId=%s,txId=%s,checkTimes=%s,sendResult,lastCheckResult=%s\n"
,
new
SimpleDateFormat
(
"HH:mm:ss,SSS"
).
format
(
new
Date
()),
msg
.
getMsgId
(),
msg
.
getTransactionId
(),
msg
.
getUserProperty
(
MessageConst
.
PROPERTY_TRANSACTION_CHECK_TIMES
),
s
);
statBenchmark
.
getUnexpectedCheckCount
().
incrementAndGet
();
return
s
;
}
}
return
msgMeta
.
checkResult
.
get
(
times
-
1
);
}
}
}
}
class
Snapshot
{
long
endTime
;
long
sendRequestSuccessCount
;
long
sendRequestFailedCount
;
long
sendMessageTimeTotal
;
long
sendMessageMaxRT
;
long
checkCount
;
long
unexpectedCheckCount
;
long
duplicatedCheck
;
}
class
StatsBenchmarkTProducer
{
class
StatsBenchmarkTProducer
{
private
final
AtomicLong
sendRequestSuccessCount
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
sendRequestSuccessCount
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
sendRequestFailedCount
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
sendRequestFailedCount
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
receiveResponseSuccessCount
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
sendMessageTimeTotal
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
receiveResponseFailedCount
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
sendMessageSuccessTimeTotal
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
sendMessageMaxRT
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
sendMessageMaxRT
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
checkRequestSuccessCount
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
checkCount
=
new
AtomicLong
(
0L
);
private
final
AtomicLong
unexpectedCheckCount
=
new
AtomicLong
(
0L
);
public
Long
[]
createSnapshot
()
{
private
final
AtomicLong
duplicatedCheckCount
=
new
AtomicLong
(
0
);
Long
[]
snap
=
new
Long
[]
{
System
.
currentTimeMillis
(),
this
.
sendRequestSuccessCount
.
get
(),
this
.
sendRequestFailedCount
.
get
(),
this
.
receiveResponseSuccessCount
.
get
(),
this
.
receiveResponseFailedCount
.
get
(),
this
.
sendMessageSuccessTimeTotal
.
get
(),
this
.
checkRequestSuccessCount
.
get
()};
return
snap
;
public
Snapshot
createSnapshot
()
{
Snapshot
s
=
new
Snapshot
();
s
.
endTime
=
System
.
currentTimeMillis
();
s
.
sendRequestSuccessCount
=
sendRequestSuccessCount
.
get
();
s
.
sendRequestFailedCount
=
sendRequestFailedCount
.
get
();
s
.
sendMessageTimeTotal
=
sendMessageTimeTotal
.
get
();
s
.
sendMessageMaxRT
=
sendMessageMaxRT
.
get
();
s
.
checkCount
=
checkCount
.
get
();
s
.
unexpectedCheckCount
=
unexpectedCheckCount
.
get
();
s
.
duplicatedCheck
=
duplicatedCheckCount
.
get
();
return
s
;
}
}
public
AtomicLong
getSendRequestSuccessCount
()
{
public
AtomicLong
getSendRequestSuccessCount
()
{
...
@@ -263,23 +401,49 @@ class StatsBenchmarkTProducer {
...
@@ -263,23 +401,49 @@ class StatsBenchmarkTProducer {
return
sendRequestFailedCount
;
return
sendRequestFailedCount
;
}
}
public
AtomicLong
get
ReceiveResponseSuccessCount
()
{
public
AtomicLong
get
SendMessageTimeTotal
()
{
return
receiveResponseSuccessCount
;
return
sendMessageTimeTotal
;
}
}
public
AtomicLong
get
ReceiveResponseFailedCount
()
{
public
AtomicLong
get
SendMessageMaxRT
()
{
return
receiveResponseFailedCount
;
return
sendMessageMaxRT
;
}
}
public
AtomicLong
get
SendMessageSuccessTimeTotal
()
{
public
AtomicLong
get
CheckCount
()
{
return
sendMessageSuccessTimeTotal
;
return
checkCount
;
}
}
public
AtomicLong
get
SendMessageMaxRT
()
{
public
AtomicLong
get
UnexpectedCheckCount
()
{
return
sendMessageMaxRT
;
return
unexpectedCheckCount
;
}
}
public
AtomicLong
getCheckRequestSuccessCount
()
{
public
AtomicLong
getDuplicatedCheckCount
()
{
return
checkRequestSuccessCount
;
return
duplicatedCheckCount
;
}
}
class
TxSendConfig
{
String
topic
;
int
threadCount
;
int
messageSize
;
double
sendRollbackRate
;
double
sendUnknownRate
;
double
checkRollbackRate
;
double
checkUnknownRate
;
long
batchId
;
int
sendInterval
;
}
class
LRUMap
<
K
,
V
>
extends
LinkedHashMap
<
K
,
V
>
{
private
int
maxSize
;
public
LRUMap
(
int
maxSize
)
{
this
.
maxSize
=
maxSize
;
}
@Override
protected
boolean
removeEldestEntry
(
Map
.
Entry
eldest
)
{
return
size
()
>
maxSize
;
}
}
}
}
srvutil/src/main/java/org/apache/rocketmq/srvutil/ServerUtil.java
浏览文件 @
34c34549
...
@@ -49,10 +49,11 @@ public class ServerUtil {
...
@@ -49,10 +49,11 @@ public class ServerUtil {
commandLine
=
parser
.
parse
(
options
,
args
);
commandLine
=
parser
.
parse
(
options
,
args
);
if
(
commandLine
.
hasOption
(
'h'
))
{
if
(
commandLine
.
hasOption
(
'h'
))
{
hf
.
printHelp
(
appName
,
options
,
true
);
hf
.
printHelp
(
appName
,
options
,
true
);
return
null
;
System
.
exit
(
0
)
;
}
}
}
catch
(
ParseException
e
)
{
}
catch
(
ParseException
e
)
{
hf
.
printHelp
(
appName
,
options
,
true
);
hf
.
printHelp
(
appName
,
options
,
true
);
System
.
exit
(
1
);
}
}
return
commandLine
;
return
commandLine
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录