Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
e297221d
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
267
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
e297221d
编写于
2月 01, 2021
作者:
赵
赵延
提交者:
GitHub
2月 01, 2021
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[ISSUE #2622] Change variable name 'lockTreeMap' to 'treeMapLock' (#2624)
上级
ea53d441
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
43 addition
and
43 deletion
+43
-43
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
...n/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+1
-1
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
.../org/apache/rocketmq/broker/topic/TopicConfigManager.java
+7
-7
client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
...mq/client/impl/consumer/ConsumeMessageOrderlyService.java
+2
-2
client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
...rg/apache/rocketmq/client/impl/consumer/ProcessQueue.java
+28
-28
client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
...ache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+2
-2
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
...g/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+3
-3
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
浏览文件 @
e297221d
...
...
@@ -146,7 +146,7 @@ public class BrokerOuterAPI {
@Override
public
void
run
()
{
try
{
RegisterBrokerResult
result
=
registerBroker
(
namesrvAddr
,
oneway
,
timeoutMills
,
requestHeader
,
body
);
RegisterBrokerResult
result
=
registerBroker
(
namesrvAddr
,
oneway
,
timeoutMills
,
requestHeader
,
body
);
if
(
result
!=
null
)
{
registerBrokerResultList
.
add
(
result
);
}
...
...
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
浏览文件 @
e297221d
...
...
@@ -45,7 +45,7 @@ public class TopicConfigManager extends ConfigManager {
private
static
final
long
LOCK_TIMEOUT_MILLIS
=
3000
;
private
static
final
int
SCHEDULE_TOPIC_QUEUE_NUM
=
18
;
private
transient
final
Lock
lockTopicConfigTable
=
new
ReentrantLock
();
private
transient
final
Lock
topicConfigTableLock
=
new
ReentrantLock
();
private
final
ConcurrentMap
<
String
,
TopicConfig
>
topicConfigTable
=
new
ConcurrentHashMap
<
String
,
TopicConfig
>(
1024
);
...
...
@@ -159,7 +159,7 @@ public class TopicConfigManager extends ConfigManager {
boolean
createNew
=
false
;
try
{
if
(
this
.
lockTopicConfigTable
.
tryLock
(
LOCK_TIMEOUT_MILLIS
,
TimeUnit
.
MILLISECONDS
))
{
if
(
this
.
topicConfigTableLock
.
tryLock
(
LOCK_TIMEOUT_MILLIS
,
TimeUnit
.
MILLISECONDS
))
{
try
{
topicConfig
=
this
.
topicConfigTable
.
get
(
topic
);
if
(
topicConfig
!=
null
)
...
...
@@ -213,7 +213,7 @@ public class TopicConfigManager extends ConfigManager {
this
.
persist
();
}
}
finally
{
this
.
lockTopicConfigTable
.
unlock
();
this
.
topicConfigTableLock
.
unlock
();
}
}
}
catch
(
InterruptedException
e
)
{
...
...
@@ -239,7 +239,7 @@ public class TopicConfigManager extends ConfigManager {
boolean
createNew
=
false
;
try
{
if
(
this
.
lockTopicConfigTable
.
tryLock
(
LOCK_TIMEOUT_MILLIS
,
TimeUnit
.
MILLISECONDS
))
{
if
(
this
.
topicConfigTableLock
.
tryLock
(
LOCK_TIMEOUT_MILLIS
,
TimeUnit
.
MILLISECONDS
))
{
try
{
topicConfig
=
this
.
topicConfigTable
.
get
(
topic
);
if
(
topicConfig
!=
null
)
...
...
@@ -257,7 +257,7 @@ public class TopicConfigManager extends ConfigManager {
this
.
dataVersion
.
nextVersion
();
this
.
persist
();
}
finally
{
this
.
lockTopicConfigTable
.
unlock
();
this
.
topicConfigTableLock
.
unlock
();
}
}
}
catch
(
InterruptedException
e
)
{
...
...
@@ -279,7 +279,7 @@ public class TopicConfigManager extends ConfigManager {
boolean
createNew
=
false
;
try
{
if
(
this
.
lockTopicConfigTable
.
tryLock
(
LOCK_TIMEOUT_MILLIS
,
TimeUnit
.
MILLISECONDS
))
{
if
(
this
.
topicConfigTableLock
.
tryLock
(
LOCK_TIMEOUT_MILLIS
,
TimeUnit
.
MILLISECONDS
))
{
try
{
topicConfig
=
this
.
topicConfigTable
.
get
(
TopicValidator
.
RMQ_SYS_TRANS_CHECK_MAX_TIME_TOPIC
);
if
(
topicConfig
!=
null
)
...
...
@@ -297,7 +297,7 @@ public class TopicConfigManager extends ConfigManager {
this
.
dataVersion
.
nextVersion
();
this
.
persist
();
}
finally
{
this
.
lockTopicConfigTable
.
unlock
();
this
.
topicConfigTableLock
.
unlock
();
}
}
}
catch
(
InterruptedException
e
)
{
...
...
client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
浏览文件 @
e297221d
...
...
@@ -478,7 +478,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
ConsumeReturnType
returnType
=
ConsumeReturnType
.
SUCCESS
;
boolean
hasException
=
false
;
try
{
this
.
processQueue
.
get
LockConsume
().
lock
();
this
.
processQueue
.
get
ConsumeLock
().
lock
();
if
(
this
.
processQueue
.
isDropped
())
{
log
.
warn
(
"consumeMessage, the message queue not be able to consume, because it's dropped. {}"
,
this
.
messageQueue
);
...
...
@@ -494,7 +494,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService {
messageQueue
);
hasException
=
true
;
}
finally
{
this
.
processQueue
.
get
LockConsume
().
unlock
();
this
.
processQueue
.
get
ConsumeLock
().
unlock
();
}
if
(
null
==
status
...
...
client/src/main/java/org/apache/rocketmq/client/impl/consumer/ProcessQueue.java
浏览文件 @
e297221d
...
...
@@ -44,11 +44,11 @@ public class ProcessQueue {
public
final
static
long
REBALANCE_LOCK_INTERVAL
=
Long
.
parseLong
(
System
.
getProperty
(
"rocketmq.client.rebalance.lockInterval"
,
"20000"
));
private
final
static
long
PULL_MAX_IDLE_TIME
=
Long
.
parseLong
(
System
.
getProperty
(
"rocketmq.client.pull.pullMaxIdleTime"
,
"120000"
));
private
final
InternalLogger
log
=
ClientLogger
.
getLog
();
private
final
ReadWriteLock
lockTreeMap
=
new
ReentrantReadWriteLock
();
private
final
ReadWriteLock
treeMapLock
=
new
ReentrantReadWriteLock
();
private
final
TreeMap
<
Long
,
MessageExt
>
msgTreeMap
=
new
TreeMap
<
Long
,
MessageExt
>();
private
final
AtomicLong
msgCount
=
new
AtomicLong
();
private
final
AtomicLong
msgSize
=
new
AtomicLong
();
private
final
Lock
lockConsume
=
new
ReentrantLock
();
private
final
Lock
consumeLock
=
new
ReentrantLock
();
/**
* A subset of msgTreeMap, will only be used when orderly consume
*/
...
...
@@ -83,7 +83,7 @@ public class ProcessQueue {
for
(
int
i
=
0
;
i
<
loop
;
i
++)
{
MessageExt
msg
=
null
;
try
{
this
.
lockTreeMap
.
readLock
().
lockInterruptibly
();
this
.
treeMapLock
.
readLock
().
lockInterruptibly
();
try
{
if
(!
msgTreeMap
.
isEmpty
()
&&
System
.
currentTimeMillis
()
-
Long
.
parseLong
(
MessageAccessor
.
getConsumeStartTimeStamp
(
msgTreeMap
.
firstEntry
().
getValue
()))
>
pushConsumer
.
getConsumeTimeout
()
*
60
*
1000
)
{
msg
=
msgTreeMap
.
firstEntry
().
getValue
();
...
...
@@ -92,7 +92,7 @@ public class ProcessQueue {
break
;
}
}
finally
{
this
.
lockTreeMap
.
readLock
().
unlock
();
this
.
treeMapLock
.
readLock
().
unlock
();
}
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"getExpiredMsg exception"
,
e
);
...
...
@@ -103,7 +103,7 @@ public class ProcessQueue {
pushConsumer
.
sendMessageBack
(
msg
,
3
);
log
.
info
(
"send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}"
,
msg
.
getTopic
(),
msg
.
getMsgId
(),
msg
.
getStoreHost
(),
msg
.
getQueueId
(),
msg
.
getQueueOffset
());
try
{
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
this
.
treeMapLock
.
writeLock
().
lockInterruptibly
();
try
{
if
(!
msgTreeMap
.
isEmpty
()
&&
msg
.
getQueueOffset
()
==
msgTreeMap
.
firstKey
())
{
try
{
...
...
@@ -113,7 +113,7 @@ public class ProcessQueue {
}
}
}
finally
{
this
.
lockTreeMap
.
writeLock
().
unlock
();
this
.
treeMapLock
.
writeLock
().
unlock
();
}
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"getExpiredMsg exception"
,
e
);
...
...
@@ -127,7 +127,7 @@ public class ProcessQueue {
public
boolean
putMessage
(
final
List
<
MessageExt
>
msgs
)
{
boolean
dispatchToConsume
=
false
;
try
{
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
this
.
treeMapLock
.
writeLock
().
lockInterruptibly
();
try
{
int
validMsgCnt
=
0
;
for
(
MessageExt
msg
:
msgs
)
{
...
...
@@ -156,7 +156,7 @@ public class ProcessQueue {
}
}
}
finally
{
this
.
lockTreeMap
.
writeLock
().
unlock
();
this
.
treeMapLock
.
writeLock
().
unlock
();
}
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"putMessage exception"
,
e
);
...
...
@@ -167,13 +167,13 @@ public class ProcessQueue {
public
long
getMaxSpan
()
{
try
{
this
.
lockTreeMap
.
readLock
().
lockInterruptibly
();
this
.
treeMapLock
.
readLock
().
lockInterruptibly
();
try
{
if
(!
this
.
msgTreeMap
.
isEmpty
())
{
return
this
.
msgTreeMap
.
lastKey
()
-
this
.
msgTreeMap
.
firstKey
();
}
}
finally
{
this
.
lockTreeMap
.
readLock
().
unlock
();
this
.
treeMapLock
.
readLock
().
unlock
();
}
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"getMaxSpan exception"
,
e
);
...
...
@@ -186,7 +186,7 @@ public class ProcessQueue {
long
result
=
-
1
;
final
long
now
=
System
.
currentTimeMillis
();
try
{
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
this
.
treeMapLock
.
writeLock
().
lockInterruptibly
();
this
.
lastConsumeTimestamp
=
now
;
try
{
if
(!
msgTreeMap
.
isEmpty
())
{
...
...
@@ -206,7 +206,7 @@ public class ProcessQueue {
}
}
}
finally
{
this
.
lockTreeMap
.
writeLock
().
unlock
();
this
.
treeMapLock
.
writeLock
().
unlock
();
}
}
catch
(
Throwable
t
)
{
log
.
error
(
"removeMessage exception"
,
t
);
...
...
@@ -245,12 +245,12 @@ public class ProcessQueue {
public
void
rollback
()
{
try
{
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
this
.
treeMapLock
.
writeLock
().
lockInterruptibly
();
try
{
this
.
msgTreeMap
.
putAll
(
this
.
consumingMsgOrderlyTreeMap
);
this
.
consumingMsgOrderlyTreeMap
.
clear
();
}
finally
{
this
.
lockTreeMap
.
writeLock
().
unlock
();
this
.
treeMapLock
.
writeLock
().
unlock
();
}
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"rollback exception"
,
e
);
...
...
@@ -259,7 +259,7 @@ public class ProcessQueue {
public
long
commit
()
{
try
{
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
this
.
treeMapLock
.
writeLock
().
lockInterruptibly
();
try
{
Long
offset
=
this
.
consumingMsgOrderlyTreeMap
.
lastKey
();
msgCount
.
addAndGet
(
0
-
this
.
consumingMsgOrderlyTreeMap
.
size
());
...
...
@@ -271,7 +271,7 @@ public class ProcessQueue {
return
offset
+
1
;
}
}
finally
{
this
.
lockTreeMap
.
writeLock
().
unlock
();
this
.
treeMapLock
.
writeLock
().
unlock
();
}
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"commit exception"
,
e
);
...
...
@@ -282,14 +282,14 @@ public class ProcessQueue {
public
void
makeMessageToConsumeAgain
(
List
<
MessageExt
>
msgs
)
{
try
{
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
this
.
treeMapLock
.
writeLock
().
lockInterruptibly
();
try
{
for
(
MessageExt
msg
:
msgs
)
{
this
.
consumingMsgOrderlyTreeMap
.
remove
(
msg
.
getQueueOffset
());
this
.
msgTreeMap
.
put
(
msg
.
getQueueOffset
(),
msg
);
}
}
finally
{
this
.
lockTreeMap
.
writeLock
().
unlock
();
this
.
treeMapLock
.
writeLock
().
unlock
();
}
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"makeMessageToCosumeAgain exception"
,
e
);
...
...
@@ -300,7 +300,7 @@ public class ProcessQueue {
List
<
MessageExt
>
result
=
new
ArrayList
<
MessageExt
>(
batchSize
);
final
long
now
=
System
.
currentTimeMillis
();
try
{
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
this
.
treeMapLock
.
writeLock
().
lockInterruptibly
();
this
.
lastConsumeTimestamp
=
now
;
try
{
if
(!
this
.
msgTreeMap
.
isEmpty
())
{
...
...
@@ -319,7 +319,7 @@ public class ProcessQueue {
consuming
=
false
;
}
}
finally
{
this
.
lockTreeMap
.
writeLock
().
unlock
();
this
.
treeMapLock
.
writeLock
().
unlock
();
}
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"take Messages exception"
,
e
);
...
...
@@ -330,11 +330,11 @@ public class ProcessQueue {
public
boolean
hasTempMessage
()
{
try
{
this
.
lockTreeMap
.
readLock
().
lockInterruptibly
();
this
.
treeMapLock
.
readLock
().
lockInterruptibly
();
try
{
return
!
this
.
msgTreeMap
.
isEmpty
();
}
finally
{
this
.
lockTreeMap
.
readLock
().
unlock
();
this
.
treeMapLock
.
readLock
().
unlock
();
}
}
catch
(
InterruptedException
e
)
{
}
...
...
@@ -344,7 +344,7 @@ public class ProcessQueue {
public
void
clear
()
{
try
{
this
.
lockTreeMap
.
writeLock
().
lockInterruptibly
();
this
.
treeMapLock
.
writeLock
().
lockInterruptibly
();
try
{
this
.
msgTreeMap
.
clear
();
this
.
consumingMsgOrderlyTreeMap
.
clear
();
...
...
@@ -352,7 +352,7 @@ public class ProcessQueue {
this
.
msgSize
.
set
(
0
);
this
.
queueOffsetMax
=
0L
;
}
finally
{
this
.
lockTreeMap
.
writeLock
().
unlock
();
this
.
treeMapLock
.
writeLock
().
unlock
();
}
}
catch
(
InterruptedException
e
)
{
log
.
error
(
"rollback exception"
,
e
);
...
...
@@ -367,8 +367,8 @@ public class ProcessQueue {
this
.
lastLockTimestamp
=
lastLockTimestamp
;
}
public
Lock
get
LockConsume
()
{
return
lockConsume
;
public
Lock
get
ConsumeLock
()
{
return
consumeLock
;
}
public
long
getLastPullTimestamp
()
{
...
...
@@ -397,7 +397,7 @@ public class ProcessQueue {
public
void
fillProcessQueueInfo
(
final
ProcessQueueInfo
info
)
{
try
{
this
.
lockTreeMap
.
readLock
().
lockInterruptibly
();
this
.
treeMapLock
.
readLock
().
lockInterruptibly
();
if
(!
this
.
msgTreeMap
.
isEmpty
())
{
info
.
setCachedMsgMinOffset
(
this
.
msgTreeMap
.
firstKey
());
...
...
@@ -421,7 +421,7 @@ public class ProcessQueue {
info
.
setLastConsumeTimestamp
(
this
.
lastConsumeTimestamp
);
}
catch
(
Exception
e
)
{
}
finally
{
this
.
lockTreeMap
.
readLock
().
unlock
();
this
.
treeMapLock
.
readLock
().
unlock
();
}
}
...
...
client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
浏览文件 @
e297221d
...
...
@@ -88,11 +88,11 @@ public class RebalancePushImpl extends RebalanceImpl {
if
(
this
.
defaultMQPushConsumerImpl
.
isConsumeOrderly
()
&&
MessageModel
.
CLUSTERING
.
equals
(
this
.
defaultMQPushConsumerImpl
.
messageModel
()))
{
try
{
if
(
pq
.
get
LockConsume
().
tryLock
(
1000
,
TimeUnit
.
MILLISECONDS
))
{
if
(
pq
.
get
ConsumeLock
().
tryLock
(
1000
,
TimeUnit
.
MILLISECONDS
))
{
try
{
return
this
.
unlockDelay
(
mq
,
pq
);
}
finally
{
pq
.
get
LockConsume
().
unlock
();
pq
.
get
ConsumeLock
().
unlock
();
}
}
else
{
log
.
warn
(
"[WRONG]mq is consuming, so can not unlock it, {}. maybe hanged for a while, {}"
,
...
...
remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
浏览文件 @
e297221d
...
...
@@ -84,7 +84,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private
final
AtomicReference
<
List
<
String
>>
namesrvAddrList
=
new
AtomicReference
<
List
<
String
>>();
private
final
AtomicReference
<
String
>
namesrvAddrChoosed
=
new
AtomicReference
<
String
>();
private
final
AtomicInteger
namesrvIndex
=
new
AtomicInteger
(
initValueIndex
());
private
final
Lock
lockNamesrvChannel
=
new
ReentrantLock
();
private
final
Lock
namesrvChannelLock
=
new
ReentrantLock
();
private
final
ExecutorService
publicExecutor
;
...
...
@@ -418,7 +418,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
}
final
List
<
String
>
addrList
=
this
.
namesrvAddrList
.
get
();
if
(
this
.
lockNamesrvChannel
.
tryLock
(
LOCK_TIMEOUT_MILLIS
,
TimeUnit
.
MILLISECONDS
))
{
if
(
this
.
namesrvChannelLock
.
tryLock
(
LOCK_TIMEOUT_MILLIS
,
TimeUnit
.
MILLISECONDS
))
{
try
{
addr
=
this
.
namesrvAddrChoosed
.
get
();
if
(
addr
!=
null
)
{
...
...
@@ -445,7 +445,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
throw
new
RemotingConnectException
(
addrList
.
toString
());
}
}
finally
{
this
.
lockNamesrvChannel
.
unlock
();
this
.
namesrvChannelLock
.
unlock
();
}
}
else
{
log
.
warn
(
"getAndCreateNameserverChannel: try to lock name server, but timeout, {}ms"
,
LOCK_TIMEOUT_MILLIS
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录