Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
s920243400
Rocketmq
提交
43730b78
R
Rocketmq
项目概览
s920243400
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
1
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
43730b78
编写于
12月 13, 2017
作者:
J
Jaskey
提交者:
dongeforever
12月 13, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[ROCKETMQ-96]Rename some temp variable and field closes apache/rocketmq#60
上级
c6741374
变更
2
显示空白变更内容
内联
并排
Showing
2 changed file
with
20 addition
and
17 deletion
+20
-17
client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
...rg/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+17
-14
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
.../rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+3
-3
未找到文件。
client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
浏览文件 @
43730b78
...
@@ -48,7 +48,10 @@ public class ProcessQueue {
...
@@ -48,7 +48,10 @@ public class ProcessQueue {
private
final
AtomicLong
msgCount
=
new
AtomicLong
();
private
final
AtomicLong
msgCount
=
new
AtomicLong
();
private
final
AtomicLong
msgSize
=
new
AtomicLong
();
private
final
AtomicLong
msgSize
=
new
AtomicLong
();
private
final
Lock
lockConsume
=
new
ReentrantLock
();
private
final
Lock
lockConsume
=
new
ReentrantLock
();
private
final
TreeMap
<
Long
,
MessageExt
>
msgTreeMapTemp
=
new
TreeMap
<
Long
,
MessageExt
>();
/**
* A subset of msgTreeMap, will only be used when orderly consume
*/
private
final
TreeMap
<
Long
,
MessageExt
>
consumingMsgOrderlyTreeMap
=
new
TreeMap
<
Long
,
MessageExt
>();
private
final
AtomicLong
tryUnlockTimes
=
new
AtomicLong
(
0
);
private
final
AtomicLong
tryUnlockTimes
=
new
AtomicLong
(
0
);
private
volatile
long
queueOffsetMax
=
0L
;
private
volatile
long
queueOffsetMax
=
0L
;
private
volatile
boolean
dropped
=
false
;
private
volatile
boolean
dropped
=
false
;
...
@@ -243,8 +246,8 @@ public class ProcessQueue {
...
@@ -243,8 +246,8 @@ public class ProcessQueue {
try
{
try
{
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
try
{
try
{
this
.
msgTreeMap
.
putAll
(
this
.
msgTreeMapTem
p
);
this
.
msgTreeMap
.
putAll
(
this
.
consumingMsgOrderlyTreeMa
p
);
this
.
msgTreeMapTem
p
.
clear
();
this
.
consumingMsgOrderlyTreeMa
p
.
clear
();
}
finally
{
}
finally
{
this
.
lockTreeMap
.
writeLock
().
unlock
();
this
.
lockTreeMap
.
writeLock
().
unlock
();
}
}
...
@@ -257,12 +260,12 @@ public class ProcessQueue {
...
@@ -257,12 +260,12 @@ public class ProcessQueue {
try
{
try
{
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
try
{
try
{
Long
offset
=
this
.
msgTreeMapTem
p
.
lastKey
();
Long
offset
=
this
.
consumingMsgOrderlyTreeMa
p
.
lastKey
();
msgCount
.
addAndGet
(
0
-
this
.
msgTreeMapTem
p
.
size
());
msgCount
.
addAndGet
(
0
-
this
.
consumingMsgOrderlyTreeMa
p
.
size
());
for
(
MessageExt
msg
:
this
.
msgTreeMapTem
p
.
values
())
{
for
(
MessageExt
msg
:
this
.
consumingMsgOrderlyTreeMa
p
.
values
())
{
msgSize
.
addAndGet
(
0
-
msg
.
getBody
().
length
);
msgSize
.
addAndGet
(
0
-
msg
.
getBody
().
length
);
}
}
this
.
msgTreeMapTem
p
.
clear
();
this
.
consumingMsgOrderlyTreeMa
p
.
clear
();
if
(
offset
!=
null
)
{
if
(
offset
!=
null
)
{
return
offset
+
1
;
return
offset
+
1
;
}
}
...
@@ -281,7 +284,7 @@ public class ProcessQueue {
...
@@ -281,7 +284,7 @@ public class ProcessQueue {
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
try
{
try
{
for
(
MessageExt
msg
:
msgs
)
{
for
(
MessageExt
msg
:
msgs
)
{
this
.
msgTreeMapTem
p
.
remove
(
msg
.
getQueueOffset
());
this
.
consumingMsgOrderlyTreeMa
p
.
remove
(
msg
.
getQueueOffset
());
this
.
msgTreeMap
.
put
(
msg
.
getQueueOffset
(),
msg
);
this
.
msgTreeMap
.
put
(
msg
.
getQueueOffset
(),
msg
);
}
}
}
finally
{
}
finally
{
...
@@ -304,7 +307,7 @@ public class ProcessQueue {
...
@@ -304,7 +307,7 @@ public class ProcessQueue {
Map
.
Entry
<
Long
,
MessageExt
>
entry
=
this
.
msgTreeMap
.
pollFirstEntry
();
Map
.
Entry
<
Long
,
MessageExt
>
entry
=
this
.
msgTreeMap
.
pollFirstEntry
();
if
(
entry
!=
null
)
{
if
(
entry
!=
null
)
{
result
.
add
(
entry
.
getValue
());
result
.
add
(
entry
.
getValue
());
msgTreeMapTem
p
.
put
(
entry
.
getKey
(),
entry
.
getValue
());
consumingMsgOrderlyTreeMa
p
.
put
(
entry
.
getKey
(),
entry
.
getValue
());
}
else
{
}
else
{
break
;
break
;
}
}
...
@@ -343,7 +346,7 @@ public class ProcessQueue {
...
@@ -343,7 +346,7 @@ public class ProcessQueue {
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
try
{
try
{
this
.
msgTreeMap
.
clear
();
this
.
msgTreeMap
.
clear
();
this
.
msgTreeMapTem
p
.
clear
();
this
.
consumingMsgOrderlyTreeMa
p
.
clear
();
this
.
msgCount
.
set
(
0
);
this
.
msgCount
.
set
(
0
);
this
.
msgSize
.
set
(
0
);
this
.
msgSize
.
set
(
0
);
this
.
queueOffsetMax
=
0L
;
this
.
queueOffsetMax
=
0L
;
...
@@ -402,10 +405,10 @@ public class ProcessQueue {
...
@@ -402,10 +405,10 @@ public class ProcessQueue {
info
.
setCachedMsgSizeInMiB
((
int
)
(
this
.
msgSize
.
get
()
/
(
1024
*
1024
)));
info
.
setCachedMsgSizeInMiB
((
int
)
(
this
.
msgSize
.
get
()
/
(
1024
*
1024
)));
}
}
if
(!
this
.
msgTreeMapTem
p
.
isEmpty
())
{
if
(!
this
.
consumingMsgOrderlyTreeMa
p
.
isEmpty
())
{
info
.
setTransactionMsgMinOffset
(
this
.
msgTreeMapTem
p
.
firstKey
());
info
.
setTransactionMsgMinOffset
(
this
.
consumingMsgOrderlyTreeMa
p
.
firstKey
());
info
.
setTransactionMsgMaxOffset
(
this
.
msgTreeMapTem
p
.
lastKey
());
info
.
setTransactionMsgMaxOffset
(
this
.
consumingMsgOrderlyTreeMa
p
.
lastKey
());
info
.
setTransactionMsgCount
(
this
.
msgTreeMapTem
p
.
size
());
info
.
setTransactionMsgCount
(
this
.
consumingMsgOrderlyTreeMa
p
.
size
());
}
}
info
.
setLocked
(
this
.
locked
);
info
.
setLocked
(
this
.
locked
);
...
...
client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
浏览文件 @
43730b78
...
@@ -454,9 +454,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
...
@@ -454,9 +454,9 @@ public class DefaultMQProducerImpl implements MQProducerInner {
String
[]
brokersSent
=
new
String
[
timesTotal
];
String
[]
brokersSent
=
new
String
[
timesTotal
];
for
(;
times
<
timesTotal
;
times
++)
{
for
(;
times
<
timesTotal
;
times
++)
{
String
lastBrokerName
=
null
==
mq
?
null
:
mq
.
getBrokerName
();
String
lastBrokerName
=
null
==
mq
?
null
:
mq
.
getBrokerName
();
MessageQueue
tmpmq
=
this
.
selectOneMessageQueue
(
topicPublishInfo
,
lastBrokerName
);
MessageQueue
mqSelected
=
this
.
selectOneMessageQueue
(
topicPublishInfo
,
lastBrokerName
);
if
(
tmpmq
!=
null
)
{
if
(
mqSelected
!=
null
)
{
mq
=
tmpmq
;
mq
=
mqSelected
;
brokersSent
[
times
]
=
mq
.
getBrokerName
();
brokersSent
[
times
]
=
mq
.
getBrokerName
();
try
{
try
{
beginTimestampPrev
=
System
.
currentTimeMillis
();
beginTimestampPrev
=
System
.
currentTimeMillis
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录