Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Kwan的解忧杂货铺@新空间代码工作室
Rocketmq
提交
238f9bcc
R
Rocketmq
项目概览
Kwan的解忧杂货铺@新空间代码工作室
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
238f9bcc
编写于
3年前
作者:
C
chenzlalvin
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[RIP-21] submodule store & broker & srvutil
上级
43744e7d
5.0.0-preview
5.0.0-alpha
5.0.0-alpha-static-topic
rocketmq-all-5.0.0-PREVIEW
无相关合并请求
变更
21
展开全部
隐藏空白更改
内联
并排
Showing
21 changed file
with
2196 addition
and
191 deletion
+2196
-191
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
...ain/java/org/apache/rocketmq/broker/BrokerController.java
+91
-13
broker/src/main/java/org/apache/rocketmq/broker/domain/LogicalQueuesInfoInBroker.java
...che/rocketmq/broker/domain/LogicalQueuesInfoInBroker.java
+116
-0
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
...n/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+21
-0
broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
...he/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+5
-0
broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
...cketmq/broker/processor/AbstractSendMessageProcessor.java
+172
-1
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
...pache/rocketmq/broker/processor/AdminBrokerProcessor.java
+700
-14
broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
...pache/rocketmq/broker/processor/PullMessageProcessor.java
+115
-26
broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
...pache/rocketmq/broker/processor/SendMessageProcessor.java
+17
-76
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
.../org/apache/rocketmq/broker/topic/TopicConfigManager.java
+114
-3
broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
...t/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
+29
-0
broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
...e/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+352
-16
broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
...e/rocketmq/broker/processor/PullMessageProcessorTest.java
+101
-0
broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
...e/rocketmq/broker/processor/SendMessageProcessorTest.java
+71
-6
broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
.../apache/rocketmq/broker/topic/TopicConfigManagerTest.java
+138
-0
srvutil/src/main/java/org/apache/rocketmq/srvutil/ConcurrentHashMapUtil.java
...va/org/apache/rocketmq/srvutil/ConcurrentHashMapUtil.java
+50
-0
store/src/main/java/org/apache/rocketmq/store/CleanFilesHook.java
...c/main/java/org/apache/rocketmq/store/CleanFilesHook.java
+23
-0
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+11
-1
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
...n/java/org/apache/rocketmq/store/DefaultMessageStore.java
+49
-15
store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
.../main/java/org/apache/rocketmq/store/MappedFileQueue.java
+10
-19
store/src/main/java/org/apache/rocketmq/store/MessageStore.java
...src/main/java/org/apache/rocketmq/store/MessageStore.java
+10
-0
store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
...t/java/org/apache/rocketmq/store/MappedFileQueueTest.java
+1
-1
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
浏览文件 @
238f9bcc
...
...
@@ -16,13 +16,18 @@
*/
package
org.apache.rocketmq.broker
;
import
com.google.common.collect.Maps
;
import
java.io.IOException
;
import
java.net.InetSocketAddress
;
import
java.util.AbstractMap
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Objects
;
import
java.util.Optional
;
import
java.util.concurrent.BlockingQueue
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ConcurrentMap
;
...
...
@@ -33,6 +38,8 @@ import java.util.concurrent.LinkedBlockingQueue;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.ThreadPoolExecutor
;
import
java.util.concurrent.TimeUnit
;
import
java.util.function.Function
;
import
java.util.stream.Collectors
;
import
org.apache.rocketmq.acl.AccessValidator
;
import
org.apache.rocketmq.broker.client.ClientHousekeepingService
;
import
org.apache.rocketmq.broker.client.ConsumerIdsChangeListener
;
...
...
@@ -42,6 +49,7 @@ import org.apache.rocketmq.broker.client.ProducerManager;
import
org.apache.rocketmq.broker.client.net.Broker2Client
;
import
org.apache.rocketmq.broker.client.rebalance.RebalanceLockManager
;
import
org.apache.rocketmq.broker.dledger.DLedgerRoleChangeHandler
;
import
org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker
;
import
org.apache.rocketmq.broker.filter.CommitLogDispatcherCalcBitMap
;
import
org.apache.rocketmq.broker.filter.ConsumerFilterManager
;
import
org.apache.rocketmq.broker.filtersrv.FilterServerManager
;
...
...
@@ -79,9 +87,11 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC
import
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge
;
import
org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl
;
import
org.apache.rocketmq.broker.util.ServiceProvider
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.Configuration
;
import
org.apache.rocketmq.common.DataVersion
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.ThreadFactoryImpl
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.UtilAll
;
...
...
@@ -89,13 +99,18 @@ import org.apache.rocketmq.common.constant.LoggerName;
import
org.apache.rocketmq.common.constant.PermName
;
import
org.apache.rocketmq.common.namesrv.RegisterBrokerResult
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.body.ClusterInfo
;
import
org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper
;
import
org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo
;
import
org.apache.rocketmq.common.stats.MomentStatsItem
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.remoting.RemotingServer
;
import
org.apache.rocketmq.remoting.common.TlsMode
;
import
org.apache.rocketmq.remoting.exception.RemotingConnectException
;
import
org.apache.rocketmq.remoting.exception.RemotingSendRequestException
;
import
org.apache.rocketmq.remoting.exception.RemotingTimeoutException
;
import
org.apache.rocketmq.remoting.netty.NettyClientConfig
;
import
org.apache.rocketmq.remoting.netty.NettyRemotingServer
;
import
org.apache.rocketmq.remoting.netty.NettyRequestProcessor
;
...
...
@@ -159,6 +174,7 @@ public class BrokerController {
private
final
BrokerStatsManager
brokerStatsManager
;
private
final
List
<
SendMessageHook
>
sendMessageHookList
=
new
ArrayList
<
SendMessageHook
>();
private
final
List
<
ConsumeMessageHook
>
consumeMessageHookList
=
new
ArrayList
<
ConsumeMessageHook
>();
private
final
ConcurrentMap
<
String
,
String
>
brokerName2AddrMap
=
Maps
.
newConcurrentMap
();
private
MessageStore
messageStore
;
private
RemotingServer
remotingServer
;
private
RemotingServer
fastRemotingServer
;
...
...
@@ -277,9 +293,9 @@ public class BrokerController {
if
(
result
)
{
try
{
this
.
messageStore
=
new
DefaultMessageStore
(
this
.
messageStoreConfig
,
this
.
brokerStatsManager
,
this
.
messageArrivingListener
,
this
.
brokerConfig
)
;
DefaultMessageStore
messageStore
=
new
DefaultMessageStore
(
this
.
messageStoreConfig
,
this
.
brokerStatsManager
,
this
.
messageArrivingListener
,
this
.
brokerConfig
);
messageStore
.
registerCleanFileHook
(
topicConfigManager
.
getLogicalQueueCleanHook
());
this
.
messageStore
=
messageStore
;
if
(
messageStoreConfig
.
isEnableDLegerCommitLog
())
{
DLedgerRoleChangeHandler
roleChangeHandler
=
new
DLedgerRoleChangeHandler
(
this
,
(
DefaultMessageStore
)
messageStore
);
((
DLedgerCommitLog
)((
DefaultMessageStore
)
messageStore
).
getCommitLog
()).
getdLedgerServer
().
getdLedgerLeaderElector
().
addRoleChangeHandler
(
roleChangeHandler
);
...
...
@@ -467,6 +483,14 @@ public class BrokerController {
},
1000
*
10
,
1000
*
60
*
2
,
TimeUnit
.
MILLISECONDS
);
}
this
.
scheduledExecutorService
.
scheduleAtFixedRate
(()
->
{
try
{
BrokerController
.
this
.
refreshBrokerNameMapping
();
}
catch
(
Exception
e
)
{
log
.
error
(
"ScheduledTask examineBrokerClusterInfo exception"
,
e
);
}
},
10
,
10
,
TimeUnit
.
SECONDS
);
if
(!
messageStoreConfig
.
isEnableDLegerCommitLog
())
{
if
(
BrokerRole
.
SLAVE
==
this
.
messageStoreConfig
.
getBrokerRole
())
{
if
(
this
.
messageStoreConfig
.
getHaMasterAddress
()
!=
null
&&
this
.
messageStoreConfig
.
getHaMasterAddress
().
length
()
>=
6
)
{
...
...
@@ -593,6 +617,18 @@ public class BrokerController {
}
}
private
void
refreshBrokerNameMapping
()
throws
InterruptedException
,
MQBrokerException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
{
ClusterInfo
brokerClusterInfo
=
this
.
brokerOuterAPI
.
getBrokerClusterInfo
();
brokerClusterInfo
.
getBrokerAddrTable
().
forEach
((
brokerName
,
data
)
->
{
String
masterBrokerAddr
=
data
.
getBrokerAddrs
().
get
(
MixAll
.
MASTER_ID
);
this
.
brokerName2AddrMap
.
put
(
brokerName
,
masterBrokerAddr
);
});
}
public
String
getBrokerAddrByName
(
String
brokerName
)
{
return
this
.
brokerName2AddrMap
.
get
(
brokerName
);
}
public
void
registerProcessor
()
{
/**
* SendMessageProcessor
...
...
@@ -1009,20 +1045,54 @@ public class BrokerController {
}
public
synchronized
void
registerIncrementBrokerData
(
TopicConfig
topicConfig
,
DataVersion
dataVersion
)
{
TopicConfig
registerTopicConfig
=
topicConfig
;
if
(!
PermName
.
isWriteable
(
this
.
getBrokerConfig
().
getBrokerPermission
())
||
!
PermName
.
isReadable
(
this
.
getBrokerConfig
().
getBrokerPermission
()))
{
registerTopicConfig
=
new
TopicConfig
(
topicConfig
.
getTopicName
(),
topicConfig
.
getReadQueueNums
(),
topicConfig
.
getWriteQueueNums
(),
this
.
brokerConfig
.
getBrokerPermission
())
;
this
.
registerIncrementBrokerData
(
Collections
.
singletonList
(
topicConfig
),
dataVersion
)
;
}
public
synchronized
void
registerIncrementBrokerData
(
List
<
TopicConfig
>
topicConfigList
,
DataVersion
dataVersion
)
{
if
(
topicConfigList
==
null
||
topicConfigList
.
isEmpty
())
{
return
;
}
ConcurrentMap
<
String
,
TopicConfig
>
topicConfigTable
=
new
ConcurrentHashMap
<
String
,
TopicConfig
>();
topicConfigTable
.
put
(
topicConfig
.
getTopicName
(),
registerTopicConfig
);
TopicConfigSerializeWrapper
topicConfigSerializeWrapper
=
new
TopicConfigSerializeWrapper
();
topicConfigSerializeWrapper
.
setDataVersion
(
dataVersion
);
ConcurrentMap
<
String
,
TopicConfig
>
topicConfigTable
=
topicConfigList
.
stream
()
.
map
(
topicConfig
->
{
TopicConfig
registerTopicConfig
;
if
(!
PermName
.
isWriteable
(
this
.
getBrokerConfig
().
getBrokerPermission
())
||
!
PermName
.
isReadable
(
this
.
getBrokerConfig
().
getBrokerPermission
()))
{
registerTopicConfig
=
new
TopicConfig
(
topicConfig
.
getTopicName
(),
topicConfig
.
getReadQueueNums
(),
topicConfig
.
getWriteQueueNums
(),
this
.
brokerConfig
.
getBrokerPermission
());
}
else
{
registerTopicConfig
=
new
TopicConfig
(
topicConfig
);
}
return
registerTopicConfig
;
})
.
collect
(
Collectors
.
toConcurrentMap
(
TopicConfig:
:
getTopicName
,
Function
.
identity
()));
topicConfigSerializeWrapper
.
setTopicConfigTable
(
topicConfigTable
);
String
brokerName
=
this
.
brokerConfig
.
getBrokerName
();
Map
<
String
,
LogicalQueuesInfo
>
logicalQueuesInfoMap
=
topicConfigList
.
stream
()
.
map
(
TopicConfig:
:
getTopicName
)
.
map
(
topicName
->
Optional
.
ofNullable
(
this
.
topicConfigManager
.
selectLogicalQueuesInfo
(
topicName
))
.
map
(
info
->
{
info
.
readLock
().
lock
();
try
{
return
new
AbstractMap
.
SimpleImmutableEntry
<>(
topicName
,
new
LogicalQueuesInfoInBroker
(
info
,
data
->
Objects
.
equals
(
data
.
getBrokerName
(),
brokerName
)));
}
finally
{
info
.
readLock
().
unlock
();
}
})
.
orElse
(
null
))
.
filter
(
Objects:
:
nonNull
)
.
collect
(
Collectors
.
toMap
(
Map
.
Entry
::
getKey
,
Map
.
Entry
::
getValue
));
if
(!
logicalQueuesInfoMap
.
isEmpty
())
{
topicConfigSerializeWrapper
.
setLogicalQueuesInfoMap
(
logicalQueuesInfoMap
);
}
doRegisterBrokerAll
(
true
,
false
,
topicConfigSerializeWrapper
);
}
...
...
@@ -1032,13 +1102,21 @@ public class BrokerController {
if
(!
PermName
.
isWriteable
(
this
.
getBrokerConfig
().
getBrokerPermission
())
||
!
PermName
.
isReadable
(
this
.
getBrokerConfig
().
getBrokerPermission
()))
{
ConcurrentHashMap
<
String
,
TopicConfig
>
topicConfigTable
=
new
ConcurrentHashMap
<
String
,
TopicConfig
>();
Map
<
String
,
LogicalQueuesInfo
>
logicalQueuesInfoMap
=
Maps
.
newHashMapWithExpectedSize
(
topicConfigWrapper
.
getTopicConfigTable
().
size
());
for
(
TopicConfig
topicConfig
:
topicConfigWrapper
.
getTopicConfigTable
().
values
())
{
String
topicName
=
topicConfig
.
getTopicName
();
TopicConfig
tmp
=
new
TopicConfig
(
topic
Config
.
getTopicName
()
,
topicConfig
.
getReadQueueNums
(),
topicConfig
.
getWriteQueueNums
(),
new
TopicConfig
(
topic
Name
,
topicConfig
.
getReadQueueNums
(),
topicConfig
.
getWriteQueueNums
(),
this
.
brokerConfig
.
getBrokerPermission
());
topicConfigTable
.
put
(
topicConfig
.
getTopicName
(),
tmp
);
topicConfigTable
.
put
(
topicName
,
tmp
);
LogicalQueuesInfoInBroker
logicalQueuesInfo
=
this
.
topicConfigManager
.
selectLogicalQueuesInfo
(
topicName
);
if
(
logicalQueuesInfo
!=
null
)
{
String
brokerName
=
this
.
brokerConfig
.
getBrokerName
();
logicalQueuesInfoMap
.
put
(
topicName
,
new
LogicalQueuesInfoInBroker
(
logicalQueuesInfo
,
data
->
Objects
.
equals
(
data
.
getBrokerName
(),
brokerName
)));
}
}
topicConfigWrapper
.
setTopicConfigTable
(
topicConfigTable
);
topicConfigWrapper
.
setLogicalQueuesInfoMap
(
logicalQueuesInfoMap
);
}
if
(
forceRegister
||
needRegister
(
this
.
brokerConfig
.
getBrokerClusterName
(),
...
...
This diff is collapsed.
Click to expand it.
broker/src/main/java/org/apache/rocketmq/broker/domain/LogicalQueuesInfoInBroker.java
0 → 100644
浏览文件 @
238f9bcc
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.domain
;
import
com.alibaba.fastjson.parser.ParserConfig
;
import
com.google.common.collect.Maps
;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.concurrent.ConcurrentMap
;
import
java.util.concurrent.ConcurrentNavigableMap
;
import
java.util.concurrent.ConcurrentSkipListMap
;
import
java.util.function.Predicate
;
import
java.util.stream.Collectors
;
import
java.util.stream.Stream
;
import
org.apache.rocketmq.common.fastjson.GenericMapSuperclassDeserializer
;
import
org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData
;
import
org.apache.rocketmq.common.protocol.route.LogicalQueuesInfo
;
import
org.apache.rocketmq.srvutil.ConcurrentHashMapUtil
;
import
static
java
.
util
.
Optional
.
ofNullable
;
public
class
LogicalQueuesInfoInBroker
extends
LogicalQueuesInfo
{
private
final
ConcurrentMap
<
Integer
,
ConcurrentNavigableMap
<
Long
,
LogicalQueueRouteData
>>
queueId2LogicalQueueMap
=
Maps
.
newConcurrentMap
();
public
LogicalQueuesInfoInBroker
()
{
}
public
LogicalQueuesInfoInBroker
(
LogicalQueuesInfoInBroker
other
)
{
this
(
other
,
null
);
}
// deep copy
public
LogicalQueuesInfoInBroker
(
LogicalQueuesInfoInBroker
other
,
Predicate
<
LogicalQueueRouteData
>
predicate
)
{
other
.
readLock
().
lock
();
try
{
for
(
Entry
<
Integer
,
List
<
LogicalQueueRouteData
>>
entry
:
other
.
entrySet
())
{
Stream
<
LogicalQueueRouteData
>
stream
=
entry
.
getValue
().
stream
();
if
(
predicate
!=
null
)
{
stream
=
stream
.
filter
(
predicate
);
}
this
.
put
(
entry
.
getKey
(),
stream
.
map
(
LogicalQueueRouteData:
:
new
).
collect
(
Collectors
.
toList
()));
}
}
finally
{
other
.
readLock
().
unlock
();
}
}
public
void
updateQueueRouteDataByQueueId
(
int
queueId
,
LogicalQueueRouteData
queueRouteData
)
{
if
(
queueRouteData
==
null
)
{
return
;
}
ConcurrentHashMapUtil
.
computeIfAbsent
(
queueId2LogicalQueueMap
,
queueId
,
k
->
new
ConcurrentSkipListMap
<>()).
put
(
queueRouteData
.
getOffsetDelta
(),
queueRouteData
);
}
/**
* find logical queue route data for message queues owned by this broker
*/
public
LogicalQueueRouteData
queryQueueRouteDataByQueueId
(
int
queueId
,
long
offset
)
{
ConcurrentNavigableMap
<
Long
,
LogicalQueueRouteData
>
m
=
this
.
queueId2LogicalQueueMap
.
get
(
queueId
);
if
(
m
==
null
||
m
.
isEmpty
())
{
return
null
;
}
Entry
<
Long
,
LogicalQueueRouteData
>
entry
=
m
.
floorEntry
(
offset
);
if
(
entry
==
null
)
{
return
null
;
}
return
entry
.
getValue
();
}
public
void
deleteQueueRouteData
(
LogicalQueueRouteData
logicalQueueRouteData
)
{
ConcurrentNavigableMap
<
Long
,
LogicalQueueRouteData
>
m
=
this
.
queueId2LogicalQueueMap
.
get
(
logicalQueueRouteData
.
getQueueId
());
if
(
m
!=
null
)
{
m
.
remove
(
logicalQueueRouteData
.
getOffsetDelta
(),
logicalQueueRouteData
);
}
}
public
LogicalQueueRouteData
nextAvailableLogicalRouteData
(
LogicalQueueRouteData
queueRouteData
,
Predicate
<
LogicalQueueRouteData
>
predicate
)
{
this
.
readLock
().
lock
();
try
{
List
<
LogicalQueueRouteData
>
queueRouteDataList
=
ofNullable
(
this
.
get
(
queueRouteData
.
getLogicalQueueIndex
())).
orElse
(
Collections
.
emptyList
());
int
idx
=
Collections
.
binarySearch
(
queueRouteDataList
,
queueRouteData
);
if
(
idx
>=
0
)
{
for
(
int
i
=
idx
+
1
,
size
=
queueRouteDataList
.
size
();
i
<
size
;
i
++)
{
LogicalQueueRouteData
tmp
=
queueRouteDataList
.
get
(
i
);
if
(
predicate
.
test
(
tmp
))
{
return
tmp
;
}
}
}
}
finally
{
this
.
readLock
().
unlock
();
}
return
null
;
}
static
{
// workaround https://github.com/alibaba/fastjson/issues/3730
ParserConfig
.
getGlobalInstance
().
putDeserializer
(
LogicalQueuesInfoInBroker
.
class
,
GenericMapSuperclassDeserializer
.
INSTANCE
);
}
}
This diff is collapsed.
Click to expand it.
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
浏览文件 @
238f9bcc
...
...
@@ -34,6 +34,7 @@ import org.apache.rocketmq.common.namesrv.RegisterBrokerResult;
import
org.apache.rocketmq.common.namesrv.TopAddressing
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.body.ClusterInfo
;
import
org.apache.rocketmq.common.protocol.body.ConsumerOffsetSerializeWrapper
;
import
org.apache.rocketmq.common.protocol.body.KVTable
;
import
org.apache.rocketmq.common.protocol.body.RegisterBrokerBody
;
...
...
@@ -48,6 +49,7 @@ import org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerReques
import
org.apache.rocketmq.common.protocol.route.TopicRouteData
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.InvokeCallback
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.remoting.RemotingClient
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
...
...
@@ -432,4 +434,23 @@ public class BrokerOuterAPI {
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
}
public
ClusterInfo
getBrokerClusterInfo
()
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
MQBrokerException
{
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
GET_BROKER_CLUSTER_INFO
,
null
);
RemotingCommand
response
=
this
.
remotingClient
.
invokeSync
(
null
,
request
,
3_000
);
assert
response
!=
null
;
switch
(
response
.
getCode
())
{
case
ResponseCode
.
SUCCESS
:
{
return
ClusterInfo
.
decode
(
response
.
getBody
(),
ClusterInfo
.
class
);
}
default
:
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
}
public
void
forwardRequest
(
String
brokerAddr
,
RemotingCommand
request
,
long
timeoutMillis
,
InvokeCallback
invokeCallback
)
throws
InterruptedException
,
RemotingSendRequestException
,
RemotingTimeoutException
,
RemotingTooMuchRequestException
,
RemotingConnectException
{
this
.
remotingClient
.
invokeAsync
(
brokerAddr
,
request
,
timeoutMillis
,
invokeCallback
);
}
}
This diff is collapsed.
Click to expand it.
broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
浏览文件 @
238f9bcc
...
...
@@ -109,6 +109,11 @@ public abstract class AbstractPluginMessageStore implements MessageStore {
return
next
.
getMaxOffsetInQueue
(
topic
,
queueId
);
}
@Override
public
long
getMaxOffsetInQueue
(
String
topic
,
int
queueId
,
boolean
committed
)
{
return
next
.
getMaxOffsetInQueue
(
topic
,
queueId
,
committed
);
}
@Override
public
long
getMinOffsetInQueue
(
String
topic
,
int
queueId
)
{
return
next
.
getMinOffsetInQueue
(
topic
,
queueId
);
...
...
This diff is collapsed.
Click to expand it.
broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
浏览文件 @
238f9bcc
...
...
@@ -16,19 +16,28 @@
*/
package
org.apache.rocketmq.broker.processor
;
import
com.google.common.collect.ImmutableList
;
import
com.google.common.collect.Maps
;
import
io.netty.channel.ChannelHandlerContext
;
import
java.net.InetSocketAddress
;
import
java.net.SocketAddress
;
import
java.util.List
;
import
java.util.Locale
;
import
java.util.Map
;
import
java.util.Optional
;
import
java.util.Random
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.ConcurrentMap
;
import
java.util.concurrent.atomic.LongAdder
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker
;
import
org.apache.rocketmq.broker.mqtrace.SendMessageContext
;
import
org.apache.rocketmq.broker.mqtrace.SendMessageHook
;
import
org.apache.rocketmq.
common.topic.TopicValidato
r
;
import
org.apache.rocketmq.
broker.topic.TopicConfigManage
r
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.TopicFilterType
;
import
org.apache.rocketmq.common.TopicQueueId
;
import
org.apache.rocketmq.common.constant.DBMsgConstants
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.constant.PermName
;
...
...
@@ -42,17 +51,23 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
import
org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2
;
import
org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader
;
import
org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData
;
import
org.apache.rocketmq.common.sysflag.MessageSysFlag
;
import
org.apache.rocketmq.common.sysflag.TopicSysFlag
;
import
org.apache.rocketmq.common.topic.TopicValidator
;
import
org.apache.rocketmq.common.utils.ChannelUtil
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.CommandCustomHeader
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor
;
import
org.apache.rocketmq.remoting.netty.NettyRequestProcessor
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.remoting.protocol.RemotingSerializable
;
import
org.apache.rocketmq.srvutil.ConcurrentHashMapUtil
;
import
org.apache.rocketmq.store.MessageExtBrokerInner
;
import
org.apache.rocketmq.store.PutMessageResult
;
public
abstract
class
AbstractSendMessageProcessor
extends
AsyncNettyRequestProcessor
implements
NettyRequestProcessor
{
protected
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
...
...
@@ -63,6 +78,8 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
protected
final
SocketAddress
storeHost
;
private
List
<
SendMessageHook
>
sendMessageHookList
;
private
final
ConcurrentMap
<
TopicQueueId
,
LongAdder
>
inFlyWritingCounterMap
=
Maps
.
newConcurrentMap
();
public
AbstractSendMessageProcessor
(
final
BrokerController
brokerController
)
{
this
.
brokerController
=
brokerController
;
this
.
storeHost
=
...
...
@@ -330,4 +347,158 @@ public abstract class AbstractSendMessageProcessor extends AsyncNettyRequestProc
public
boolean
rejectRequest
()
{
return
false
;
}
public
ConcurrentMap
<
TopicQueueId
,
LongAdder
>
getInFlyWritingCounterMap
()
{
return
inFlyWritingCounterMap
;
}
protected
LogicalQueueContext
buildLogicalQueueContext
(
String
topic
,
int
queueId
,
RemotingCommand
response
)
{
TopicConfigManager
topicConfigManager
=
this
.
brokerController
.
getTopicConfigManager
();
LogicalQueuesInfoInBroker
logicalQueuesInfo
=
topicConfigManager
.
selectLogicalQueuesInfo
(
topic
);
if
(
logicalQueuesInfo
==
null
)
{
return
noopLogicalQueueContext
;
}
// writable route data will has largest offset
LogicalQueueRouteData
curQueueRouteData
=
logicalQueuesInfo
.
queryQueueRouteDataByQueueId
(
queueId
,
Long
.
MAX_VALUE
);
if
(
curQueueRouteData
==
null
)
{
// topic enabled logical queue, but some message queues are not converted or being converted
String
msg
=
String
.
format
(
Locale
.
ENGLISH
,
"queueId %d not included in logical queue"
,
queueId
);
log
.
debug
(
"buildLogicalQueueContext unexpected error, topic {} {}"
,
topic
,
msg
);
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
msg
);
return
noopLogicalQueueContext
;
}
LongAdder
inFlyWritingCounter
=
ConcurrentHashMapUtil
.
computeIfAbsent
(
inFlyWritingCounterMap
,
new
TopicQueueId
(
topic
,
queueId
),
ignore
->
new
LongAdder
());
return
new
LogicalQueueContext
(
topic
,
queueId
,
logicalQueuesInfo
,
curQueueRouteData
,
inFlyWritingCounter
);
}
protected
class
LogicalQueueContext
{
private
final
String
topic
;
private
final
int
queueId
;
private
final
LogicalQueuesInfoInBroker
logicalQueuesInfo
;
private
final
LogicalQueueRouteData
curQueueRouteData
;
private
final
LongAdder
inFlyWritingCounter
;
public
LogicalQueueContext
(
String
topic
,
int
queueId
,
LogicalQueuesInfoInBroker
logicalQueuesInfo
,
LogicalQueueRouteData
curQueueRouteData
,
LongAdder
inFlyWritingCounter
)
{
this
.
topic
=
topic
;
this
.
queueId
=
queueId
;
this
.
logicalQueuesInfo
=
logicalQueuesInfo
;
this
.
curQueueRouteData
=
curQueueRouteData
;
this
.
inFlyWritingCounter
=
inFlyWritingCounter
;
}
public
CompletableFuture
<
RemotingCommand
>
hookBeforePut
(
ChannelHandlerContext
ctx
,
SendMessageRequestHeader
requestHeader
,
RemotingCommand
request
,
RemotingCommand
response
)
{
if
(
curQueueRouteData
.
isWritable
())
{
this
.
inFlyWritingCounter
.
increment
();
return
null
;
}
int
logicalQueueIdx
=
curQueueRouteData
.
getLogicalQueueIndex
();
List
<
LogicalQueueRouteData
>
queueRouteDataList
=
logicalQueuesInfo
.
get
(
logicalQueueIdx
);
LogicalQueueRouteData
writableQueueRouteData
=
null
;
for
(
int
i
=
queueRouteDataList
.
size
()
-
1
;
i
>=
0
;
i
--)
{
LogicalQueueRouteData
queueRouteData
=
queueRouteDataList
.
get
(
i
);
if
(
queueRouteData
.
isWritable
())
{
writableQueueRouteData
=
queueRouteData
;
break
;
}
}
if
(
writableQueueRouteData
==
null
)
{
response
.
setCode
(
ResponseCode
.
NO_PERMISSION
);
response
.
setRemark
(
String
.
format
(
Locale
.
ENGLISH
,
"broker[%s] topic[%s] queueId[%d] logicalQueueIdx[%d] not writable"
,
AbstractSendMessageProcessor
.
this
.
brokerController
.
getBrokerConfig
().
getBrokerIP1
(),
topic
,
queueId
,
logicalQueueIdx
));
return
CompletableFuture
.
completedFuture
(
response
);
}
if
((
Optional
.
ofNullable
(
requestHeader
.
getSysFlag
()).
orElse
(
0
)
&
MessageSysFlag
.
LOGICAL_QUEUE_FLAG
)
>
0
)
{
// new client, use redirect
response
.
setCode
(
ResponseCode
.
NO_PERMISSION
);
response
.
addExtField
(
MessageConst
.
PROPERTY_REDIRECT
,
"1"
);
response
.
setBody
(
RemotingSerializable
.
encode
(
ImmutableList
.
of
(
curQueueRouteData
,
writableQueueRouteData
)));
return
CompletableFuture
.
completedFuture
(
response
);
}
else
{
// old client, use forward
this
.
logicalQueueHookForward
(
ctx
,
writableQueueRouteData
,
requestHeader
,
request
,
response
);
}
if
(
response
.
getCode
()
!=
-
1
)
{
return
CompletableFuture
.
completedFuture
(
response
);
}
else
if
(
response
.
getCode
()
==
ResponseCode
.
ASYNC_AND_RETURN_NULL
)
{
return
CompletableFuture
.
completedFuture
(
null
);
}
return
null
;
}
private
void
logicalQueueHookForward
(
ChannelHandlerContext
ctx
,
LogicalQueueRouteData
writableQueueRouteData
,
SendMessageRequestHeader
requestHeader
,
RemotingCommand
request
,
RemotingCommand
response
)
{
response
.
setCode
(
ResponseCode
.
SUCCESS
);
requestHeader
.
setQueueId
(
writableQueueRouteData
.
getQueueId
());
request
.
writeCustomHeader
(
requestHeader
);
String
brokerName
=
writableQueueRouteData
.
getBrokerName
();
BrokerController
brokerController
=
AbstractSendMessageProcessor
.
this
.
brokerController
;
String
brokerAddr
=
brokerController
.
getBrokerAddrByName
(
brokerName
);
if
(
brokerAddr
==
null
)
{
log
.
warn
(
"getBrokerAddrByName brokerName={} got null, fallback to queueRouteData.getBrokerAddr()"
,
brokerName
);
brokerAddr
=
writableQueueRouteData
.
getBrokerAddr
();
}
if
(
brokerAddr
==
null
)
{
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
String
msg
=
String
.
format
(
Locale
.
ENGLISH
,
"unknown brokerName %s"
,
brokerName
);
response
.
setRemark
(
msg
);
log
.
warn
(
"logicalQueueHookForward can not look up brokerName={}: {}"
,
brokerName
,
requestHeader
);
return
;
}
try
{
String
finalBrokerAddr
=
brokerAddr
;
brokerController
.
getBrokerOuterAPI
().
forwardRequest
(
brokerAddr
,
request
,
brokerController
.
getBrokerConfig
().
getForwardTimeout
(),
responseFuture
->
{
RemotingCommand
forwardResponse
=
responseFuture
.
getResponseCommand
();
if
(
forwardResponse
==
null
)
{
forwardResponse
=
response
;
forwardResponse
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
if
(!
responseFuture
.
isSendRequestOK
())
{
forwardResponse
.
setRemark
(
String
.
format
(
Locale
.
ENGLISH
,
"send request failed to %s: %s"
,
finalBrokerAddr
,
responseFuture
.
getCause
()));
}
else
if
(
responseFuture
.
isTimeout
())
{
forwardResponse
.
setRemark
(
String
.
format
(
Locale
.
ENGLISH
,
"wait response from %s timeout: %dms"
,
finalBrokerAddr
,
responseFuture
.
getTimeoutMillis
()));
}
else
{
forwardResponse
.
setRemark
(
String
.
format
(
Locale
.
ENGLISH
,
"unknown reason. addr: %s, timeoutMillis: %d: %s"
,
finalBrokerAddr
,
responseFuture
.
getTimeoutMillis
(),
responseFuture
.
getCause
()));
}
}
else
{
CommandCustomHeader
customHeader
=
forwardResponse
.
readCustomHeader
();
if
(
customHeader
instanceof
SendMessageResponseHeader
)
{
SendMessageResponseHeader
responseHeader
=
(
SendMessageResponseHeader
)
customHeader
;
Integer
forwardQueueId
=
responseHeader
.
getQueueId
();
forwardResponse
.
addExtField
(
MessageConst
.
PROPERTY_FORWARD_QUEUE_ID
,
forwardQueueId
!=
null
?
Integer
.
toString
(
forwardQueueId
)
:
"null"
);
responseHeader
.
setQueueId
(
requestHeader
.
getQueueId
());
// queueOffset should not be changed since forwarded broker will add delta to it.
}
}
AbstractSendMessageProcessor
.
this
.
doResponse
(
ctx
,
request
,
forwardResponse
);
});
response
.
setCode
(
ResponseCode
.
ASYNC_AND_RETURN_NULL
);
}
catch
(
Exception
e
)
{
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"forward error"
);
log
.
warn
(
String
.
format
(
Locale
.
ENGLISH
,
"logicalQueueHookForward to %s error"
,
brokerAddr
),
e
);
}
}
public
void
hookAfterPut
(
CompletableFuture
<
PutMessageResult
>
putMessageResult
)
{
Optional
.
ofNullable
(
putMessageResult
).
orElse
(
CompletableFuture
.
completedFuture
(
null
)).
whenComplete
((
result
,
throwable
)
->
{
this
.
inFlyWritingCounter
.
decrement
();
});
}
}
private
final
LogicalQueueContext
noopLogicalQueueContext
=
new
LogicalQueueContext
(
null
,
0
,
null
,
null
,
null
)
{
@Override
public
CompletableFuture
<
RemotingCommand
>
hookBeforePut
(
ChannelHandlerContext
ctx
,
SendMessageRequestHeader
requestHeader
,
RemotingCommand
request
,
RemotingCommand
response
)
{
return
null
;
}
@Override
public
void
hookAfterPut
(
CompletableFuture
<
PutMessageResult
>
putMessageResult
)
{
}
};
}
This diff is collapsed.
Click to expand it.
broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
浏览文件 @
238f9bcc
此差异已折叠。
点击以展开。
broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
浏览文件 @
238f9bcc
...
...
@@ -16,6 +16,7 @@
*/
package
org.apache.rocketmq.broker.processor
;
import
com.google.common.collect.ImmutableList
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelFuture
;
import
io.netty.channel.ChannelFutureListener
;
...
...
@@ -25,6 +26,7 @@ import java.nio.ByteBuffer;
import
java.util.List
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.client.ConsumerGroupInfo
;
import
org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker
;
import
org.apache.rocketmq.broker.filter.ConsumerFilterData
;
import
org.apache.rocketmq.broker.filter.ConsumerFilterManager
;
import
org.apache.rocketmq.broker.filter.ExpressionForRetryMessageFilter
;
...
...
@@ -41,6 +43,7 @@ import org.apache.rocketmq.common.constant.PermName;
import
org.apache.rocketmq.common.filter.ExpressionType
;
import
org.apache.rocketmq.common.filter.FilterAPI
;
import
org.apache.rocketmq.common.help.FAQUrl
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
...
...
@@ -48,6 +51,8 @@ import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
import
org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader
;
import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel
;
import
org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData
;
import
org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData
;
import
org.apache.rocketmq.common.protocol.route.MessageQueueRouteState
;
import
org.apache.rocketmq.common.protocol.topic.OffsetMovedEvent
;
import
org.apache.rocketmq.common.subscription.SubscriptionGroupConfig
;
import
org.apache.rocketmq.common.sysflag.MessageSysFlag
;
...
...
@@ -62,6 +67,7 @@ import org.apache.rocketmq.remoting.netty.AsyncNettyRequestProcessor;
import
org.apache.rocketmq.remoting.netty.NettyRequestProcessor
;
import
org.apache.rocketmq.remoting.netty.RequestTask
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.remoting.protocol.RemotingSerializable
;
import
org.apache.rocketmq.store.GetMessageResult
;
import
org.apache.rocketmq.store.MessageExtBrokerInner
;
import
org.apache.rocketmq.store.MessageFilter
;
...
...
@@ -126,23 +132,25 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
final
long
suspendTimeoutMillisLong
=
hasSuspendFlag
?
requestHeader
.
getSuspendTimeoutMillis
()
:
0
;
TopicConfig
topicConfig
=
this
.
brokerController
.
getTopicConfigManager
().
selectTopicConfig
(
requestHeader
.
getTopic
());
String
topic
=
requestHeader
.
getTopic
();
TopicConfig
topicConfig
=
this
.
brokerController
.
getTopicConfigManager
().
selectTopicConfig
(
topic
);
if
(
null
==
topicConfig
)
{
log
.
error
(
"the topic {} not exist, consumer: {}"
,
requestHeader
.
getTopic
()
,
RemotingHelper
.
parseChannelRemoteAddr
(
channel
));
log
.
error
(
"the topic {} not exist, consumer: {}"
,
topic
,
RemotingHelper
.
parseChannelRemoteAddr
(
channel
));
response
.
setCode
(
ResponseCode
.
TOPIC_NOT_EXIST
);
response
.
setRemark
(
String
.
format
(
"topic[%s] not exist, apply first please! %s"
,
requestHeader
.
getTopic
()
,
FAQUrl
.
suggestTodo
(
FAQUrl
.
APPLY_TOPIC_URL
)));
response
.
setRemark
(
String
.
format
(
"topic[%s] not exist, apply first please! %s"
,
topic
,
FAQUrl
.
suggestTodo
(
FAQUrl
.
APPLY_TOPIC_URL
)));
return
response
;
}
if
(!
PermName
.
isReadable
(
topicConfig
.
getPerm
()))
{
response
.
setCode
(
ResponseCode
.
NO_PERMISSION
);
response
.
setRemark
(
"the topic["
+
requestHeader
.
getTopic
()
+
"] pulling message is forbidden"
);
response
.
setRemark
(
"the topic["
+
topic
+
"] pulling message is forbidden"
);
return
response
;
}
if
(
requestHeader
.
getQueueId
()
<
0
||
requestHeader
.
getQueueId
()
>=
topicConfig
.
getReadQueueNums
())
{
int
queueId
=
requestHeader
.
getQueueId
();
if
(
queueId
<
0
||
queueId
>=
topicConfig
.
getReadQueueNums
())
{
String
errorInfo
=
String
.
format
(
"queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]"
,
requestHeader
.
getQueueId
(),
requestHeader
.
getTopic
()
,
topicConfig
.
getReadQueueNums
(),
channel
.
remoteAddress
());
queueId
,
topic
,
topicConfig
.
getReadQueueNums
(),
channel
.
remoteAddress
());
log
.
warn
(
errorInfo
);
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
errorInfo
);
...
...
@@ -154,11 +162,11 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
if
(
hasSubscriptionFlag
)
{
try
{
subscriptionData
=
FilterAPI
.
build
(
requestHeader
.
getTopic
()
,
requestHeader
.
getSubscription
(),
requestHeader
.
getExpressionType
()
topic
,
requestHeader
.
getSubscription
(),
requestHeader
.
getExpressionType
()
);
if
(!
ExpressionType
.
isTagType
(
subscriptionData
.
getExpressionType
()))
{
consumerFilterData
=
ConsumerFilterManager
.
build
(
requestHeader
.
getTopic
()
,
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getSubscription
(),
topic
,
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getSubscription
(),
requestHeader
.
getExpressionType
(),
requestHeader
.
getSubVersion
()
);
assert
consumerFilterData
!=
null
;
...
...
@@ -187,9 +195,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return
response
;
}
subscriptionData
=
consumerGroupInfo
.
findSubscriptionData
(
requestHeader
.
getTopic
()
);
subscriptionData
=
consumerGroupInfo
.
findSubscriptionData
(
topic
);
if
(
null
==
subscriptionData
)
{
log
.
warn
(
"the consumer's subscription not exist, group: {}, topic:{}"
,
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
()
);
log
.
warn
(
"the consumer's subscription not exist, group: {}, topic:{}"
,
requestHeader
.
getConsumerGroup
(),
topic
);
response
.
setCode
(
ResponseCode
.
SUBSCRIPTION_NOT_EXIST
);
response
.
setRemark
(
"the consumer's subscription not exist"
+
FAQUrl
.
suggestTodo
(
FAQUrl
.
SAME_GROUP_DIFFERENT_TOPIC
));
return
response
;
...
...
@@ -203,7 +211,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
return
response
;
}
if
(!
ExpressionType
.
isTagType
(
subscriptionData
.
getExpressionType
()))
{
consumerFilterData
=
this
.
brokerController
.
getConsumerFilterManager
().
get
(
requestHeader
.
getTopic
()
,
consumerFilterData
=
this
.
brokerController
.
getConsumerFilterManager
().
get
(
topic
,
requestHeader
.
getConsumerGroup
());
if
(
consumerFilterData
==
null
)
{
response
.
setCode
(
ResponseCode
.
FILTER_DATA_NOT_EXIST
);
...
...
@@ -212,7 +220,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
}
if
(
consumerFilterData
.
getClientVersion
()
<
requestHeader
.
getSubVersion
())
{
log
.
warn
(
"The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}"
,
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
()
,
consumerFilterData
.
getClientVersion
(),
requestHeader
.
getSubVersion
());
requestHeader
.
getConsumerGroup
(),
topic
,
consumerFilterData
.
getClientVersion
(),
requestHeader
.
getSubVersion
());
response
.
setCode
(
ResponseCode
.
FILTER_DATA_NOT_LATEST
);
response
.
setRemark
(
"the consumer's consumer filter data not latest"
);
return
response
;
...
...
@@ -236,13 +244,74 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
this
.
brokerController
.
getConsumerFilterManager
());
}
long
offset
=
requestHeader
.
getQueueOffset
();
int
maxMsgNums
=
requestHeader
.
getMaxMsgNums
();
LogicalQueuesInfoInBroker
logicalQueuesInfo
=
this
.
brokerController
.
getTopicConfigManager
().
selectLogicalQueuesInfo
(
topic
);
LogicalQueueRouteData
queueRouteData
=
null
;
if
(
logicalQueuesInfo
!=
null
)
{
int
responseErrorCode
=
ResponseCode
.
SUCCESS
;
queueRouteData
=
logicalQueuesInfo
.
queryQueueRouteDataByQueueId
(
queueId
,
offset
);
if
(
queueRouteData
!=
null
)
{
if
(
queueRouteData
.
isWriteOnly
())
{
responseErrorCode
=
ResponseCode
.
PULL_NOT_FOUND
;
response
.
setRemark
(
"logical queue write only"
);
}
else
if
(
queueRouteData
.
isExpired
())
{
responseErrorCode
=
ResponseCode
.
PULL_RETRY_IMMEDIATELY
;
response
.
setRemark
(
"logical queue expired"
);
prepareRedirectResponse
(
response
,
logicalQueuesInfo
,
queueRouteData
);
}
else
if
(
MessageQueueRouteState
.
ReadOnly
.
equals
(
queueRouteData
.
getState
())
&&
queueRouteData
.
getOffsetMax
()
>=
0
)
{
if
(
offset
>=
queueRouteData
.
getOffsetMax
())
{
responseErrorCode
=
ResponseCode
.
PULL_RETRY_IMMEDIATELY
;
response
.
setRemark
(
"queue offset exceed offsetMax"
);
prepareRedirectResponse
(
response
,
logicalQueuesInfo
,
queueRouteData
);
}
else
if
(
offset
+
maxMsgNums
>
queueRouteData
.
getOffsetMax
())
{
if
((
queueRouteData
.
getOffsetMax
()
-
1
<=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQueue
(
topic
,
queueId
))
&&
(
this
.
brokerController
.
getMessageStore
().
getCommitLogOffsetInQueue
(
topic
,
queueId
,
queueRouteData
.
getOffsetMax
()
-
1
)
<
this
.
brokerController
.
getMessageStore
().
getMinPhyOffset
()))
{
responseErrorCode
=
ResponseCode
.
PULL_RETRY_IMMEDIATELY
;
response
.
setRemark
(
"queue offset removed"
);
prepareRedirectResponse
(
response
,
logicalQueuesInfo
,
queueRouteData
);
}
else
{
maxMsgNums
=
(
int
)
(
queueRouteData
.
getOffsetMax
()
-
offset
);
if
(
maxMsgNums
<=
0
)
{
responseErrorCode
=
ResponseCode
.
PULL_RETRY_IMMEDIATELY
;
response
.
setRemark
(
"queue offset out of range"
);
prepareRedirectResponse
(
response
,
logicalQueuesInfo
,
queueRouteData
);
}
}
}
}
}
else
{
responseErrorCode
=
ResponseCode
.
PULL_RETRY_IMMEDIATELY
;
response
.
setRemark
(
"no suitable queue"
);
response
.
addExtField
(
MessageConst
.
PROPERTY_REDIRECT
,
"1"
);
// instruct client to refresh all
response
.
setBody
(
null
);
queueRouteData
=
logicalQueuesInfo
.
queryQueueRouteDataByQueueId
(
queueId
,
0L
);
}
if
(
responseErrorCode
!=
ResponseCode
.
SUCCESS
)
{
response
.
setCode
(
responseErrorCode
);
responseHeader
.
setMinOffset
(
offset
);
responseHeader
.
setMaxOffset
(
queueRouteData
!=
null
?
queueRouteData
.
getOffsetMax
()
:
offset
);
responseHeader
.
setNextBeginOffset
(
queueRouteData
!=
null
?
queueRouteData
.
getOffsetMax
()
:
offset
);
responseHeader
.
setSuggestWhichBrokerId
(
MixAll
.
MASTER_ID
);
return
response
;
}
}
final
GetMessageResult
getMessageResult
=
this
.
brokerController
.
getMessageStore
().
getMessage
(
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
()
,
requestHeader
.
getQueueId
(),
requestHeader
.
getQueueOffset
(),
requestHeader
.
getMaxMsgNums
()
,
messageFilter
);
this
.
brokerController
.
getMessageStore
().
getMessage
(
requestHeader
.
getConsumerGroup
(),
topic
,
queueId
,
offset
,
maxMsgNums
,
messageFilter
);
if
(
getMessageResult
!=
null
)
{
response
.
setRemark
(
getMessageResult
.
getStatus
().
name
());
responseHeader
.
setNextBeginOffset
(
getMessageResult
.
getNextBeginOffset
());
long
nextBeginOffset
=
getMessageResult
.
getNextBeginOffset
();
if
(
queueRouteData
!=
null
&&
queueRouteData
.
getOffsetMax
()
>=
0
&&
nextBeginOffset
>
queueRouteData
.
getOffsetMax
())
{
// prevent from pulling messages from next logical queue route data
nextBeginOffset
=
queueRouteData
.
getOffsetMax
();
}
responseHeader
.
setNextBeginOffset
(
nextBeginOffset
);
responseHeader
.
setMinOffset
(
getMessageResult
.
getMinOffset
());
// this does not need to be modified since it's not an accurate value under logical queue.
responseHeader
.
setMaxOffset
(
getMessageResult
.
getMaxOffset
());
if
(
getMessageResult
.
isSuggestPullingFromSlave
())
{
...
...
@@ -291,9 +360,9 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
// XXX: warn and notify me
log
.
info
(
"the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}"
,
requestHeader
.
getQueueOffset
(),
getMessageResult
.
getNextBeginOffset
()
,
requestHeader
.
getTopic
()
,
requestHeader
.
getQueueId
()
,
nextBeginOffset
,
topic
,
queueId
,
requestHeader
.
getConsumerGroup
()
);
}
else
{
...
...
@@ -318,7 +387,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
case
OFFSET_TOO_SMALL:
response
.
setCode
(
ResponseCode
.
PULL_OFFSET_MOVED
);
log
.
info
(
"the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}"
,
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
()
,
requestHeader
.
getQueueOffset
(),
requestHeader
.
getConsumerGroup
(),
topic
,
requestHeader
.
getQueueOffset
(),
getMessageResult
.
getMinOffset
(),
channel
.
remoteAddress
());
break
;
default
:
...
...
@@ -329,8 +398,8 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
if
(
this
.
hasConsumeMessageHook
())
{
ConsumeMessageContext
context
=
new
ConsumeMessageContext
();
context
.
setConsumerGroup
(
requestHeader
.
getConsumerGroup
());
context
.
setTopic
(
requestHeader
.
getTopic
()
);
context
.
setQueueId
(
requestHeader
.
getQueueId
()
);
context
.
setTopic
(
topic
);
context
.
setQueueId
(
queueId
);
String
owner
=
request
.
getExtFields
().
get
(
BrokerStatsManager
.
COMMERCIAL_OWNER
);
...
...
@@ -414,9 +483,6 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
pollingTimeMills
=
this
.
brokerController
.
getBrokerConfig
().
getShortPollingTimeMills
();
}
String
topic
=
requestHeader
.
getTopic
();
long
offset
=
requestHeader
.
getQueueOffset
();
int
queueId
=
requestHeader
.
getQueueId
();
PullRequest
pullRequest
=
new
PullRequest
(
request
,
channel
,
pollingTimeMills
,
this
.
brokerController
.
getMessageStore
().
now
(),
offset
,
subscriptionData
,
messageFilter
);
this
.
brokerController
.
getPullRequestHoldService
().
suspendPullRequest
(
topic
,
queueId
,
pullRequest
);
...
...
@@ -424,6 +490,20 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
break
;
}
if
(
queueRouteData
!=
null
)
{
logicalQueuesInfo
.
readLock
().
lock
();
try
{
List
<
LogicalQueueRouteData
>
queueRouteDataList
=
logicalQueuesInfo
.
get
(
queueRouteData
.
getLogicalQueueIndex
());
MessageQueue
latestMessageQueue
=
queueRouteDataList
.
get
(
queueRouteDataList
.
size
()
-
1
).
getMessageQueue
();
if
(!
latestMessageQueue
.
getBrokerName
().
equals
(
brokerController
.
getBrokerConfig
().
getBrokerName
())
||
latestMessageQueue
.
getQueueId
()
!=
queueId
)
{
// There are other newer message queue, instruct client to refresh meta-data to access these
prepareRedirectResponse
(
response
,
logicalQueuesInfo
,
queueRouteData
);
}
}
finally
{
logicalQueuesInfo
.
readLock
().
unlock
();
}
}
case
ResponseCode
.
PULL_RETRY_IMMEDIATELY
:
break
;
case
ResponseCode
.
PULL_OFFSET_MOVED
:
...
...
@@ -438,7 +518,7 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
event
.
setConsumerGroup
(
requestHeader
.
getConsumerGroup
());
event
.
setMessageQueue
(
mq
);
event
.
setOffsetRequest
(
requestHeader
.
getQueueOffset
());
event
.
setOffsetNew
(
getMessageResult
.
getNextBeginOffset
()
);
event
.
setOffsetNew
(
nextBeginOffset
);
this
.
generateOffsetMovedEvent
(
event
);
log
.
warn
(
"PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}"
,
...
...
@@ -467,11 +547,20 @@ public class PullMessageProcessor extends AsyncNettyRequestProcessor implements
&&
this
.
brokerController
.
getMessageStoreConfig
().
getBrokerRole
()
!=
BrokerRole
.
SLAVE
;
if
(
storeOffsetEnable
)
{
this
.
brokerController
.
getConsumerOffsetManager
().
commitOffset
(
RemotingHelper
.
parseChannelRemoteAddr
(
channel
),
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
()
,
requestHeader
.
getCommitOffset
());
requestHeader
.
getConsumerGroup
(),
topic
,
queueId
,
requestHeader
.
getCommitOffset
());
}
return
response
;
}
private
void
prepareRedirectResponse
(
RemotingCommand
response
,
LogicalQueuesInfoInBroker
logicalQueuesInfo
,
LogicalQueueRouteData
queueRouteData
)
{
LogicalQueueRouteData
nextReadableLogicalQueueRouteData
=
logicalQueuesInfo
.
nextAvailableLogicalRouteData
(
queueRouteData
,
LogicalQueueRouteData:
:
isReadable
);
if
(
nextReadableLogicalQueueRouteData
!=
null
)
{
response
.
addExtField
(
MessageConst
.
PROPERTY_REDIRECT
,
"1"
);
response
.
setBody
(
RemotingSerializable
.
encode
(
ImmutableList
.
of
(
queueRouteData
,
nextReadableLogicalQueueRouteData
)));
}
}
public
boolean
hasConsumeMessageHook
()
{
return
consumeMessageHookList
!=
null
&&
!
this
.
consumeMessageHookList
.
isEmpty
();
}
...
...
This diff is collapsed.
Click to expand it.
broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
浏览文件 @
238f9bcc
...
...
@@ -287,6 +287,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageAccessor
.
putProperty
(
msgInner
,
MessageConst
.
PROPERTY_CLUSTER
,
clusterName
);
msgInner
.
setPropertiesString
(
MessageDecoder
.
messageProperties2String
(
msgInner
.
getProperties
()));
LogicalQueueContext
logicalQueueContext
=
super
.
buildLogicalQueueContext
(
msgInner
.
getTopic
(),
msgInner
.
getQueueId
(),
response
);
CompletableFuture
<
RemotingCommand
>
future
=
logicalQueueContext
.
hookBeforePut
(
ctx
,
requestHeader
,
request
,
response
);
if
(
future
!=
null
)
{
return
future
;
}
CompletableFuture
<
PutMessageResult
>
putMessageResult
=
null
;
Map
<
String
,
String
>
origProps
=
MessageDecoder
.
string2messageProperties
(
requestHeader
.
getProperties
());
String
transFlag
=
origProps
.
get
(
MessageConst
.
PROPERTY_TRANSACTION_PREPARED
);
...
...
@@ -296,12 +302,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
response
.
setRemark
(
"the broker["
+
this
.
brokerController
.
getBrokerConfig
().
getBrokerIP1
()
+
"] sending transaction message is forbidden"
);
logicalQueueContext
.
hookAfterPut
(
null
);
return
CompletableFuture
.
completedFuture
(
response
);
}
putMessageResult
=
this
.
brokerController
.
getTransactionalMessageService
().
asyncPrepareMessage
(
msgInner
);
}
else
{
putMessageResult
=
this
.
brokerController
.
getMessageStore
().
asyncPutMessage
(
msgInner
);
}
logicalQueueContext
.
hookAfterPut
(
putMessageResult
);
return
handlePutMessageResultFuture
(
putMessageResult
,
response
,
request
,
msgInner
,
responseHeader
,
mqtraceContext
,
ctx
,
queueIdInt
);
}
...
...
@@ -362,82 +370,6 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return
true
;
}
private
RemotingCommand
sendMessage
(
final
ChannelHandlerContext
ctx
,
final
RemotingCommand
request
,
final
SendMessageContext
sendMessageContext
,
final
SendMessageRequestHeader
requestHeader
)
throws
RemotingCommandException
{
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
SendMessageResponseHeader
.
class
);
final
SendMessageResponseHeader
responseHeader
=
(
SendMessageResponseHeader
)
response
.
readCustomHeader
();
response
.
setOpaque
(
request
.
getOpaque
());
response
.
addExtField
(
MessageConst
.
PROPERTY_MSG_REGION
,
this
.
brokerController
.
getBrokerConfig
().
getRegionId
());
response
.
addExtField
(
MessageConst
.
PROPERTY_TRACE_SWITCH
,
String
.
valueOf
(
this
.
brokerController
.
getBrokerConfig
().
isTraceOn
()));
log
.
debug
(
"receive SendMessage request command, {}"
,
request
);
final
long
startTimstamp
=
this
.
brokerController
.
getBrokerConfig
().
getStartAcceptSendRequestTimeStamp
();
if
(
this
.
brokerController
.
getMessageStore
().
now
()
<
startTimstamp
)
{
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
String
.
format
(
"broker unable to service, until %s"
,
UtilAll
.
timeMillisToHumanString2
(
startTimstamp
)));
return
response
;
}
response
.
setCode
(-
1
);
super
.
msgCheck
(
ctx
,
requestHeader
,
response
);
if
(
response
.
getCode
()
!=
-
1
)
{
return
response
;
}
final
byte
[]
body
=
request
.
getBody
();
int
queueIdInt
=
requestHeader
.
getQueueId
();
TopicConfig
topicConfig
=
this
.
brokerController
.
getTopicConfigManager
().
selectTopicConfig
(
requestHeader
.
getTopic
());
if
(
queueIdInt
<
0
)
{
queueIdInt
=
Math
.
abs
(
this
.
random
.
nextInt
()
%
99999999
)
%
topicConfig
.
getWriteQueueNums
();
}
MessageExtBrokerInner
msgInner
=
new
MessageExtBrokerInner
();
msgInner
.
setTopic
(
requestHeader
.
getTopic
());
msgInner
.
setQueueId
(
queueIdInt
);
if
(!
handleRetryAndDLQ
(
requestHeader
,
response
,
request
,
msgInner
,
topicConfig
))
{
return
response
;
}
msgInner
.
setBody
(
body
);
msgInner
.
setFlag
(
requestHeader
.
getFlag
());
MessageAccessor
.
setProperties
(
msgInner
,
MessageDecoder
.
string2messageProperties
(
requestHeader
.
getProperties
()));
msgInner
.
setBornTimestamp
(
requestHeader
.
getBornTimestamp
());
msgInner
.
setBornHost
(
ctx
.
channel
().
remoteAddress
());
msgInner
.
setStoreHost
(
this
.
getStoreHost
());
msgInner
.
setReconsumeTimes
(
requestHeader
.
getReconsumeTimes
()
==
null
?
0
:
requestHeader
.
getReconsumeTimes
());
String
clusterName
=
this
.
brokerController
.
getBrokerConfig
().
getBrokerClusterName
();
MessageAccessor
.
putProperty
(
msgInner
,
MessageConst
.
PROPERTY_CLUSTER
,
clusterName
);
msgInner
.
setPropertiesString
(
MessageDecoder
.
messageProperties2String
(
msgInner
.
getProperties
()));
PutMessageResult
putMessageResult
=
null
;
Map
<
String
,
String
>
oriProps
=
MessageDecoder
.
string2messageProperties
(
requestHeader
.
getProperties
());
String
traFlag
=
oriProps
.
get
(
MessageConst
.
PROPERTY_TRANSACTION_PREPARED
);
if
(
traFlag
!=
null
&&
Boolean
.
parseBoolean
(
traFlag
)
&&
!(
msgInner
.
getReconsumeTimes
()
>
0
&&
msgInner
.
getDelayTimeLevel
()
>
0
))
{
//For client under version 4.6.1
if
(
this
.
brokerController
.
getBrokerConfig
().
isRejectTransactionMessage
())
{
response
.
setCode
(
ResponseCode
.
NO_PERMISSION
);
response
.
setRemark
(
"the broker["
+
this
.
brokerController
.
getBrokerConfig
().
getBrokerIP1
()
+
"] sending transaction message is forbidden"
);
return
response
;
}
putMessageResult
=
this
.
brokerController
.
getTransactionalMessageService
().
prepareMessage
(
msgInner
);
}
else
{
putMessageResult
=
this
.
brokerController
.
getMessageStore
().
putMessage
(
msgInner
);
}
return
handlePutMessageResult
(
putMessageResult
,
response
,
request
,
msgInner
,
responseHeader
,
sendMessageContext
,
ctx
,
queueIdInt
);
}
private
RemotingCommand
handlePutMessageResult
(
PutMessageResult
putMessageResult
,
RemotingCommand
response
,
RemotingCommand
request
,
MessageExt
msg
,
SendMessageResponseHeader
responseHeader
,
SendMessageContext
sendMessageContext
,
ChannelHandlerContext
ctx
,
...
...
@@ -587,7 +519,16 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
String
clusterName
=
this
.
brokerController
.
getBrokerConfig
().
getBrokerClusterName
();
MessageAccessor
.
putProperty
(
messageExtBatch
,
MessageConst
.
PROPERTY_CLUSTER
,
clusterName
);
LogicalQueueContext
logicalQueueContext
=
super
.
buildLogicalQueueContext
(
messageExtBatch
.
getTopic
(),
messageExtBatch
.
getQueueId
(),
response
);
CompletableFuture
<
RemotingCommand
>
future
=
logicalQueueContext
.
hookBeforePut
(
ctx
,
requestHeader
,
request
,
response
);
if
(
future
!=
null
)
{
return
future
;
}
CompletableFuture
<
PutMessageResult
>
putMessageResult
=
this
.
brokerController
.
getMessageStore
().
asyncPutMessages
(
messageExtBatch
);
logicalQueueContext
.
hookAfterPut
(
putMessageResult
);
return
handlePutMessageResultFuture
(
putMessageResult
,
response
,
request
,
messageExtBatch
,
responseHeader
,
mqtraceContext
,
ctx
,
queueIdInt
);
}
...
...
This diff is collapsed.
Click to expand it.
broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
浏览文件 @
238f9bcc
...
...
@@ -17,16 +17,20 @@
package
org.apache.rocketmq.broker.topic
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map.Entry
;
import
java.util.Objects
;
import
java.util.Set
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ConcurrentMap
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.locks.Lock
;
import
java.util.concurrent.locks.ReentrantLock
;
import
java.util.stream.Collectors
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.BrokerPathConfigHelper
;
import
org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker
;
import
org.apache.rocketmq.common.ConfigManager
;
import
org.apache.rocketmq.common.DataVersion
;
import
org.apache.rocketmq.common.MixAll
;
...
...
@@ -35,10 +39,15 @@ import org.apache.rocketmq.common.constant.LoggerName;
import
org.apache.rocketmq.common.constant.PermName
;
import
org.apache.rocketmq.common.protocol.body.KVTable
;
import
org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper
;
import
org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData
;
import
org.apache.rocketmq.common.sysflag.TopicSysFlag
;
import
org.apache.rocketmq.common.topic.TopicValidator
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.srvutil.ConcurrentHashMapUtil
;
import
org.apache.rocketmq.store.CleanFilesHook
;
import
org.apache.rocketmq.store.DefaultMessageStore
;
import
org.apache.rocketmq.store.MessageStore
;
public
class
TopicConfigManager
extends
ConfigManager
{
private
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
...
...
@@ -52,8 +61,19 @@ public class TopicConfigManager extends ConfigManager {
private
final
DataVersion
dataVersion
=
new
DataVersion
();
private
transient
BrokerController
brokerController
;
public
TopicConfigManager
()
{
}
private
final
ConcurrentMap
<
String
,
LogicalQueuesInfoInBroker
>
logicalQueuesInfoTable
=
new
ConcurrentHashMap
<>();
private
final
CleanFilesHook
logicalQueueCleanHook
=
new
CleanFilesHook
()
{
@Override
public
void
execute
(
DefaultMessageStore
defaultMessageStore
,
long
deleteCount
)
{
if
(
deleteCount
==
0
)
{
return
;
}
TopicConfigManager
.
this
.
logicalQueueClean
();
}
@Override
public
String
getName
()
{
return
TopicConfigManager
.
class
.
getSimpleName
()
+
".logicalQueueCleanHook"
;
}
};
public
TopicConfigManager
(
BrokerController
brokerController
)
{
this
.
brokerController
=
brokerController
;
...
...
@@ -362,7 +382,7 @@ public class TopicConfigManager extends ConfigManager {
this
.
dataVersion
.
nextVersion
();
this
.
persist
();
this
.
persist
(
topicConfig
.
getTopicName
(),
topicConfig
);
}
public
void
updateOrderTopicConfig
(
final
KVTable
orderKVTableFromNs
)
{
...
...
@@ -421,6 +441,8 @@ public class TopicConfigManager extends ConfigManager {
public
TopicConfigSerializeWrapper
buildTopicConfigSerializeWrapper
()
{
TopicConfigSerializeWrapper
topicConfigSerializeWrapper
=
new
TopicConfigSerializeWrapper
();
topicConfigSerializeWrapper
.
setTopicConfigTable
(
this
.
topicConfigTable
);
String
brokerName
=
this
.
brokerController
.
getBrokerConfig
().
getBrokerName
();
topicConfigSerializeWrapper
.
setLogicalQueuesInfoMap
(
this
.
logicalQueuesInfoTable
.
entrySet
().
stream
().
collect
(
Collectors
.
toMap
(
Map
.
Entry
::
getKey
,
e
->
new
LogicalQueuesInfoInBroker
(
e
.
getValue
(),
data
->
Objects
.
equals
(
data
.
getBrokerName
(),
brokerName
)))));
topicConfigSerializeWrapper
.
setDataVersion
(
this
.
dataVersion
);
return
topicConfigSerializeWrapper
;
}
...
...
@@ -452,6 +474,7 @@ public class TopicConfigManager extends ConfigManager {
public
String
encode
(
final
boolean
prettyFormat
)
{
TopicConfigSerializeWrapper
topicConfigSerializeWrapper
=
new
TopicConfigSerializeWrapper
();
topicConfigSerializeWrapper
.
setTopicConfigTable
(
this
.
topicConfigTable
);
topicConfigSerializeWrapper
.
setLogicalQueuesInfoMap
(
this
.
logicalQueuesInfoTable
.
entrySet
().
stream
().
collect
(
Collectors
.
toMap
(
Map
.
Entry
::
getKey
,
e
->
new
LogicalQueuesInfoInBroker
(
e
.
getValue
()))));
topicConfigSerializeWrapper
.
setDataVersion
(
this
.
dataVersion
);
return
topicConfigSerializeWrapper
.
toJson
(
prettyFormat
);
}
...
...
@@ -471,4 +494,92 @@ public class TopicConfigManager extends ConfigManager {
public
ConcurrentMap
<
String
,
TopicConfig
>
getTopicConfigTable
()
{
return
topicConfigTable
;
}
public
LogicalQueuesInfoInBroker
selectLogicalQueuesInfo
(
String
topicName
)
{
return
this
.
logicalQueuesInfoTable
.
get
(
topicName
);
}
public
LogicalQueuesInfoInBroker
getOrCreateLogicalQueuesInfo
(
String
topicName
)
{
return
ConcurrentHashMapUtil
.
computeIfAbsent
(
this
.
logicalQueuesInfoTable
,
topicName
,
ignored
->
new
LogicalQueuesInfoInBroker
());
}
public
boolean
replaceTopicConfig
(
String
topic
,
TopicConfig
oldTopicConfig
,
TopicConfig
newTopicConfig
)
{
boolean
ok
=
this
.
topicConfigTable
.
replace
(
topic
,
oldTopicConfig
,
newTopicConfig
);
if
(
ok
)
{
this
.
dataVersion
.
nextVersion
();
persist
(
topic
,
newTopicConfig
);
}
return
ok
;
}
public
CleanFilesHook
getLogicalQueueCleanHook
()
{
return
logicalQueueCleanHook
;
}
void
logicalQueueClean
()
{
String
brokerName
=
this
.
brokerController
.
getBrokerConfig
().
getBrokerName
();
MessageStore
messageStore
=
this
.
brokerController
.
getMessageStore
();
for
(
Entry
<
String
,
LogicalQueuesInfoInBroker
>
entry
:
this
.
logicalQueuesInfoTable
.
entrySet
())
{
String
topic
=
entry
.
getKey
();
LogicalQueuesInfoInBroker
logicalQueuesInfo
=
entry
.
getValue
();
Lock
readLock
=
logicalQueuesInfo
.
readLock
();
Lock
writeLock
=
logicalQueuesInfo
.
writeLock
();
boolean
changed
=
false
;
readLock
.
lock
();
try
{
for
(
List
<
LogicalQueueRouteData
>
list
:
logicalQueuesInfo
.
values
())
{
while
(!
list
.
isEmpty
())
{
LogicalQueueRouteData
logicalQueueRouteData
=
list
.
get
(
0
);
String
brokerBelongs
;
if
(
brokerName
.
equals
(
logicalQueueRouteData
.
getBrokerName
()))
{
if
(
logicalQueueRouteData
.
isWritable
())
{
break
;
}
boolean
canRemove
=
logicalQueueRouteData
.
isExpired
()
||
logicalQueueRouteData
.
getMessagesCount
()
==
0
;
if
(!
canRemove
)
{
// do not use getMinOffsetInQueue method, since it is using ConsumeQueue data, but not CommitLog, CQ data is not accurate after CommitLog cleaning.
long
commitLogOffset
=
messageStore
.
getCommitLogOffsetInQueue
(
topic
,
logicalQueueRouteData
.
getQueueId
(),
logicalQueueRouteData
.
getOffsetMax
()
-
1
);
canRemove
=
commitLogOffset
==
0
||
messageStore
.
getMinPhyOffset
()
>
commitLogOffset
;
}
if
(!
canRemove
)
{
break
;
}
brokerBelongs
=
"self"
;
}
else
{
brokerBelongs
=
"other"
;
}
readLock
.
unlock
();
writeLock
.
lock
();
try
{
list
.
remove
(
0
);
}
finally
{
readLock
.
lock
();
writeLock
.
unlock
();
}
log
.
info
(
"logicalQueueClean remove {} broker {}"
,
brokerBelongs
,
logicalQueueRouteData
);
changed
=
true
;
}
}
if
(
changed
)
{
logicalQueuesInfo
=
new
LogicalQueuesInfoInBroker
(
logicalQueuesInfo
);
}
}
finally
{
readLock
.
unlock
();
}
if
(
changed
)
{
this
.
dataVersion
.
nextVersion
();
this
.
persist
(
topic
,
logicalQueuesInfo
);
this
.
brokerController
.
registerIncrementBrokerData
(
this
.
selectTopicConfig
(
topic
),
this
.
dataVersion
);
log
.
info
(
"registerIncrementBrokerData because logicalQueueClean: {}"
,
topic
);
}
}
}
public
void
deleteQueueRouteData
(
String
topic
)
{
if
(
this
.
logicalQueuesInfoTable
.
remove
(
topic
)
!=
null
)
{
log
.
info
(
"delete queueRouteData config OK, topic: {}"
,
topic
);
this
.
dataVersion
.
nextVersion
();
persist
(
topic
,
(
LogicalQueuesInfoInBroker
)
null
);
}
}
}
This diff is collapsed.
Click to expand it.
broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
浏览文件 @
238f9bcc
...
...
@@ -22,19 +22,26 @@ import com.google.common.collect.Iterables;
import
com.google.common.collect.Lists
;
import
io.netty.channel.ChannelHandlerContext
;
import
java.lang.reflect.Field
;
import
java.util.Collections
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.concurrent.TimeUnit
;
import
org.apache.rocketmq.broker.out.BrokerOuterAPI
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.namesrv.RegisterBrokerResult
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.body.ClusterInfo
;
import
org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper
;
import
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader
;
import
org.apache.rocketmq.common.protocol.route.BrokerData
;
import
org.apache.rocketmq.remoting.netty.NettyClientConfig
;
import
org.apache.rocketmq.remoting.netty.NettyRemotingClient
;
import
org.apache.rocketmq.remoting.netty.NettyServerConfig
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.remoting.protocol.RemotingSerializable
;
import
org.apache.rocketmq.store.MessageStore
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.junit.Test
;
...
...
@@ -50,6 +57,8 @@ import static org.junit.Assert.assertTrue;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyLong
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyString
;
import
static
org
.
mockito
.
ArgumentMatchers
.
argThat
;
import
static
org
.
mockito
.
ArgumentMatchers
.
isNull
;
import
static
org
.
mockito
.
Mockito
.
when
;
@RunWith
(
MockitoJUnitRunner
.
class
)
...
...
@@ -182,6 +191,26 @@ public class BrokerOuterAPITest {
assertEquals
(
2
,
registerBrokerResultList
.
size
());
}
@Test
public
void
testGetBrokerClusterInfo
()
throws
Exception
{
init
();
brokerOuterAPI
.
start
();
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
null
);
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setRemark
(
null
);
ClusterInfo
want
=
new
ClusterInfo
();
want
.
setBrokerAddrTable
(
new
HashMap
<>(
Collections
.
singletonMap
(
"key"
,
new
BrokerData
(
"cluster"
,
"broker"
,
new
HashMap
<>(
Collections
.
singletonMap
(
MixAll
.
MASTER_ID
,
"127.0.0.1:10911"
))))));
response
.
setBody
(
RemotingSerializable
.
encode
(
want
));
when
(
nettyRemotingClient
.
invokeSync
(
isNull
(),
argThat
(
argument
->
argument
.
getCode
()
==
RequestCode
.
GET_BROKER_CLUSTER_INFO
),
anyLong
())).
thenReturn
(
response
);
ClusterInfo
got
=
brokerOuterAPI
.
getBrokerClusterInfo
();
assertEquals
(
want
,
got
);
}
private
RemotingCommand
buildResponse
(
Boolean
changed
)
{
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
QueryDataVersionResponseHeader
.
class
);
final
QueryDataVersionResponseHeader
responseHeader
=
(
QueryDataVersionResponseHeader
)
response
.
readCustomHeader
();
...
...
This diff is collapsed.
Click to expand it.
broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
浏览文件 @
238f9bcc
此差异已折叠。
点击以展开。
broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
浏览文件 @
238f9bcc
...
...
@@ -20,24 +20,31 @@ import io.netty.channel.Channel;
import
io.netty.channel.ChannelHandlerContext
;
import
java.net.InetSocketAddress
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Set
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.client.ClientChannelInfo
;
import
org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker
;
import
org.apache.rocketmq.broker.filter.ExpressionMessageFilter
;
import
org.apache.rocketmq.broker.mqtrace.ConsumeMessageContext
;
import
org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.consumer.ConsumeFromWhere
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.PullMessageResponseHeader
;
import
org.apache.rocketmq.common.protocol.heartbeat.ConsumeType
;
import
org.apache.rocketmq.common.protocol.heartbeat.ConsumerData
;
import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel
;
import
org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData
;
import
org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData
;
import
org.apache.rocketmq.common.protocol.route.MessageQueueRouteState
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.netty.NettyClientConfig
;
import
org.apache.rocketmq.remoting.netty.NettyServerConfig
;
...
...
@@ -46,6 +53,7 @@ import org.apache.rocketmq.store.GetMessageResult;
import
org.apache.rocketmq.store.GetMessageStatus
;
import
org.apache.rocketmq.store.MessageStore
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.assertj.core.util.Lists
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
...
...
@@ -53,11 +61,15 @@ import org.mockito.Mock;
import
org.mockito.Spy
;
import
org.mockito.junit.MockitoJUnitRunner
;
import
static
java
.
util
.
Optional
.
ofNullable
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyInt
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyLong
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyString
;
import
static
org
.
mockito
.
ArgumentMatchers
.
eq
;
import
static
org
.
mockito
.
ArgumentMatchers
.
intThat
;
import
static
org
.
mockito
.
Mockito
.
lenient
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
when
;
...
...
@@ -92,6 +104,7 @@ public class PullMessageProcessorTest {
consumerData
.
getConsumeFromWhere
(),
consumerData
.
getSubscriptionDataSet
(),
false
);
brokerController
.
getTopicConfigManager
().
updateTopicConfig
(
new
TopicConfig
(
topic
));
}
@Test
...
...
@@ -192,6 +205,94 @@ public class PullMessageProcessorTest {
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
PULL_OFFSET_MOVED
);
}
@Test
public
void
testProcessRequest_LogicalQueue
()
throws
Exception
{
String
brokerName
=
brokerController
.
getBrokerConfig
().
getBrokerName
();
int
queueId
=
1
;
GetMessageResult
getMessageResult
=
createGetMessageResult
();
when
(
messageStore
.
getMessage
(
anyString
(),
eq
(
topic
),
eq
(
queueId
),
eq
(
456L
),
anyInt
(),
any
(
ExpressionMessageFilter
.
class
))).
thenReturn
(
getMessageResult
);
when
(
messageStore
.
getMaxOffsetInQueue
(
eq
(
topic
),
eq
(
queueId
))).
thenReturn
(
2000L
);
when
(
messageStore
.
getMinPhyOffset
()).
thenReturn
(
0L
);
LogicalQueuesInfoInBroker
logicalQueuesInfo
=
brokerController
.
getTopicConfigManager
().
getOrCreateLogicalQueuesInfo
(
topic
);
LogicalQueueRouteData
queueRouteData1
=
new
LogicalQueueRouteData
(
0
,
0
,
new
MessageQueue
(
topic
,
brokerName
,
queueId
),
MessageQueueRouteState
.
Normal
,
0
,
-
1
,
-
1
,
-
1
,
brokerController
.
getBrokerAddr
());
logicalQueuesInfo
.
put
(
0
,
Lists
.
newArrayList
(
queueRouteData1
));
logicalQueuesInfo
.
updateQueueRouteDataByQueueId
(
queueRouteData1
.
getQueueId
(),
queueRouteData1
);
// normal
{
final
RemotingCommand
request
=
createPullMsgCommand
(
RequestCode
.
PULL_MESSAGE
);
RemotingCommand
response
=
pullMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUCCESS
);
}
// write only
queueRouteData1
.
setState
(
MessageQueueRouteState
.
WriteOnly
);
{
final
RemotingCommand
request
=
createPullMsgCommand
(
RequestCode
.
PULL_MESSAGE
);
RemotingCommand
response
=
pullMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
PULL_NOT_FOUND
);
}
// no message and redirect
queueRouteData1
.
setState
(
MessageQueueRouteState
.
ReadOnly
);
queueRouteData1
.
setOffsetMax
(
460
);
queueRouteData1
.
setFirstMsgTimeMillis
(
100
);
queueRouteData1
.
setLastMsgTimeMillis
(
200
);
LogicalQueueRouteData
queueRouteData2
=
new
LogicalQueueRouteData
(
0
,
460
,
new
MessageQueue
(
topic
,
"broker2"
,
1
),
MessageQueueRouteState
.
Normal
,
0
,
-
1
,
-
1
,
-
1
,
brokerController
.
getBrokerAddr
());
logicalQueuesInfo
.
get
(
0
).
add
(
queueRouteData2
);
getMessageResult
.
setStatus
(
GetMessageStatus
.
OFFSET_FOUND_NULL
);
when
(
messageStore
.
getCommitLogOffsetInQueue
(
eq
(
topic
),
eq
(
queueId
),
eq
(
460L
-
1L
))).
thenReturn
(
1000L
);
{
final
RemotingCommand
request
=
createPullMsgCommand
(
RequestCode
.
PULL_MESSAGE
);
RemotingCommand
response
=
pullMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
PULL_NOT_FOUND
);
assertThat
(
response
.
getExtFields
()).
containsKey
(
MessageConst
.
PROPERTY_REDIRECT
);
}
// same message queue has two routes
queueRouteData2
.
setState
(
MessageQueueRouteState
.
ReadOnly
);
queueRouteData2
.
setOffsetMax
(
50
);
queueRouteData2
.
setFirstMsgTimeMillis
(
300
);
queueRouteData2
.
setLastMsgTimeMillis
(
400
);
LogicalQueueRouteData
queueRouteData3
=
new
LogicalQueueRouteData
(
0
,
510
,
new
MessageQueue
(
topic
,
queueRouteData2
.
getBrokerName
(),
queueId
),
MessageQueueRouteState
.
Normal
,
460
,
-
1
,
-
1
,
-
1
,
queueRouteData1
.
getBrokerAddr
());
logicalQueuesInfo
.
get
(
0
).
add
(
queueRouteData3
);
logicalQueuesInfo
.
updateQueueRouteDataByQueueId
(
queueRouteData3
.
getQueueId
(),
queueRouteData3
);
{
GetMessageResult
getMessageResult2
=
createGetMessageResult
();
getMessageResult2
.
setStatus
(
GetMessageStatus
.
FOUND
);
getMessageResult2
.
setNextBeginOffset
(
460
);
when
(
messageStore
.
getMessage
(
anyString
(),
eq
(
queueRouteData1
.
getTopic
()),
eq
(
queueRouteData1
.
getQueueId
()),
eq
(
456L
),
eq
(
4
),
any
(
ExpressionMessageFilter
.
class
))).
thenReturn
(
getMessageResult2
);
}
{
GetMessageResult
getMessageResult2
=
createGetMessageResult
();
getMessageResult2
.
setStatus
(
GetMessageStatus
.
FOUND
);
getMessageResult2
.
setNextBeginOffset
(
470
);
lenient
().
when
(
messageStore
.
getMessage
(
anyString
(),
eq
(
queueRouteData1
.
getTopic
()),
eq
(
queueRouteData1
.
getQueueId
()),
eq
(
456L
),
intThat
(
i
->
i
>
4
),
any
(
ExpressionMessageFilter
.
class
))).
thenReturn
(
getMessageResult2
);
}
{
final
RemotingCommand
request
=
createPullMsgCommand
(
RequestCode
.
PULL_MESSAGE
);
RemotingCommand
response
=
pullMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUCCESS
);
assertThat
(
ofNullable
(
response
.
getExtFields
()).
orElse
(
new
HashMap
<>())).
doesNotContainKey
(
MessageConst
.
PROPERTY_REDIRECT
);
PullMessageResponseHeader
header
=
(
PullMessageResponseHeader
)
response
.
readCustomHeader
();
assertThat
(
header
.
getNextBeginOffset
()).
isEqualTo
(
460
);
}
{
when
(
messageStore
.
getMinPhyOffset
()).
thenReturn
(
100000L
);
final
RemotingCommand
request
=
createPullMsgCommand
(
RequestCode
.
PULL_MESSAGE
);
RemotingCommand
response
=
pullMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
PULL_RETRY_IMMEDIATELY
);
assertThat
(
ofNullable
(
response
.
getExtFields
()).
orElse
(
new
HashMap
<>())).
containsKey
(
MessageConst
.
PROPERTY_REDIRECT
);
PullMessageResponseHeader
header
=
(
PullMessageResponseHeader
)
response
.
readCustomHeader
();
assertThat
(
header
.
getNextBeginOffset
()).
isEqualTo
(
460
);
}
}
private
RemotingCommand
createPullMsgCommand
(
int
requestCode
)
{
PullMessageRequestHeader
requestHeader
=
new
PullMessageRequestHeader
();
requestHeader
.
setCommitOffset
(
123L
);
...
...
This diff is collapsed.
Click to expand it.
broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
浏览文件 @
238f9bcc
...
...
@@ -18,18 +18,30 @@ package org.apache.rocketmq.broker.processor;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelHandlerContext
;
import
java.net.InetSocketAddress
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.TimeUnit
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker
;
import
org.apache.rocketmq.broker.mqtrace.SendMessageContext
;
import
org.apache.rocketmq.broker.mqtrace.SendMessageHook
;
import
org.apache.rocketmq.broker.transaction.TransactionalMessageService
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.constant.PermName
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData
;
import
org.apache.rocketmq.common.protocol.route.MessageQueueRouteState
;
import
org.apache.rocketmq.common.sysflag.MessageSysFlag
;
import
org.apache.rocketmq.common.topic.TopicValidator
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
...
...
@@ -43,6 +55,7 @@ import org.apache.rocketmq.store.MessageStore;
import
org.apache.rocketmq.store.PutMessageResult
;
import
org.apache.rocketmq.store.PutMessageStatus
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.assertj.core.util.Lists
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
...
...
@@ -52,12 +65,6 @@ import org.mockito.invocation.InvocationOnMock;
import
org.mockito.junit.MockitoJUnitRunner
;
import
org.mockito.stubbing.Answer
;
import
java.net.InetSocketAddress
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.CompletableFuture
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyLong
;
...
...
@@ -90,6 +97,8 @@ public class SendMessageProcessorTest {
when
(
handlerContext
.
channel
()).
thenReturn
(
mockChannel
);
when
(
messageStore
.
lookMessageByOffset
(
anyLong
())).
thenReturn
(
new
MessageExt
());
sendMessageProcessor
=
new
SendMessageProcessor
(
brokerController
);
brokerController
.
getTopicConfigManager
().
updateTopicConfig
(
new
TopicConfig
(
topic
,
8
,
8
,
PermName
.
PERM_WRITE
|
PermName
.
PERM_READ
));
}
@Test
...
...
@@ -220,6 +229,62 @@ public class SendMessageProcessorTest {
assertThat
(
response
[
0
].
getCode
()).
isEqualTo
(
ResponseCode
.
SUCCESS
);
}
@Test
public
void
testProcessRequest_LogicalQueue
()
throws
Exception
{
when
(
messageStore
.
asyncPutMessage
(
any
(
MessageExtBrokerInner
.
class
)))
.
thenReturn
(
CompletableFuture
.
completedFuture
(
new
PutMessageResult
(
PutMessageStatus
.
PUT_OK
,
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
))));
LogicalQueuesInfoInBroker
logicalQueuesInfo
=
brokerController
.
getTopicConfigManager
().
getOrCreateLogicalQueuesInfo
(
topic
);
LogicalQueueRouteData
queueRouteData1
=
new
LogicalQueueRouteData
(
0
,
0
,
new
MessageQueue
(
topic
,
brokerController
.
getBrokerConfig
().
getBrokerName
(),
1
),
MessageQueueRouteState
.
Normal
,
0
,
-
1
,
-
1
,
-
1
,
brokerController
.
getBrokerAddr
());
logicalQueuesInfo
.
put
(
0
,
Lists
.
newArrayList
(
queueRouteData1
));
logicalQueuesInfo
.
updateQueueRouteDataByQueueId
(
queueRouteData1
.
getQueueId
(),
queueRouteData1
);
SendMessageRequestHeader
requestHeader
=
createSendMsgRequestHeader
();
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
SEND_MESSAGE
,
requestHeader
);
request
.
setBody
(
new
byte
[]
{
'a'
});
request
.
makeCustomHeaderToNet
();
// normal
RemotingCommand
responseToReturn
;
{
CompletableFuture
<
RemotingCommand
>
responseFuture
=
new
CompletableFuture
<>();
doAnswer
(
invocation
->
{
responseFuture
.
complete
(
invocation
.
getArgument
(
0
));
return
null
;
}).
when
(
handlerContext
).
writeAndFlush
(
any
(
Object
.
class
));
responseToReturn
=
sendMessageProcessor
.
processRequest
(
handlerContext
,
request
);
if
(
responseToReturn
==
null
)
{
responseToReturn
=
responseFuture
.
get
(
3
,
TimeUnit
.
SECONDS
);
}
}
assertThat
(
responseToReturn
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUCCESS
);
assertThat
(
responseToReturn
.
getOpaque
()).
isEqualTo
(
request
.
getOpaque
());
// read only
queueRouteData1
.
setState
(
MessageQueueRouteState
.
ReadOnly
);
responseToReturn
=
sendMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
responseToReturn
.
getCode
()).
isEqualTo
(
ResponseCode
.
NO_PERMISSION
);
assertThat
(
responseToReturn
.
getRemark
()).
contains
(
"not writable"
);
// read only and forward
logicalQueuesInfo
.
get
(
0
).
add
(
new
LogicalQueueRouteData
(
0
,
100
,
new
MessageQueue
(
topic
,
"broker2"
,
1
),
MessageQueueRouteState
.
Normal
,
0
,
-
1
,
-
1
,
-
1
,
brokerController
.
getBrokerAddr
()));
responseToReturn
=
sendMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
responseToReturn
.
getCode
()).
isEqualTo
(
ResponseCode
.
SYSTEM_ERROR
);
assertThat
(
responseToReturn
.
getRemark
()).
contains
(
"forward error"
);
// read only and redirect
requestHeader
=
(
SendMessageRequestHeader
)
request
.
readCustomHeader
();
requestHeader
.
setSysFlag
(
MessageSysFlag
.
LOGICAL_QUEUE_FLAG
);
request
.
makeCustomHeaderToNet
();
responseToReturn
=
sendMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
responseToReturn
.
getCode
()).
isEqualTo
(
ResponseCode
.
NO_PERMISSION
);
assertThat
(
responseToReturn
.
getExtFields
()).
containsKey
(
MessageConst
.
PROPERTY_REDIRECT
);
}
private
RemotingCommand
createSendTransactionMsgCommand
(
int
requestCode
)
{
SendMessageRequestHeader
header
=
createSendMsgRequestHeader
();
int
sysFlag
=
header
.
getSysFlag
();
...
...
This diff is collapsed.
Click to expand it.
broker/src/test/java/org/apache/rocketmq/broker/topic/TopicConfigManagerTest.java
0 → 100644
浏览文件 @
238f9bcc
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.topic
;
import
com.google.common.collect.Lists
;
import
java.nio.file.Files
;
import
java.nio.file.Paths
;
import
java.util.Arrays
;
import
java.util.Collections
;
import
java.util.List
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.DataVersion
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.protocol.route.LogicalQueueRouteData
;
import
org.apache.rocketmq.common.protocol.route.MessageQueueRouteState
;
import
org.apache.rocketmq.store.DefaultMessageStore
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.junit.After
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.ArgumentMatchers
;
import
org.mockito.Mock
;
import
org.mockito.junit.MockitoJUnitRunner
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyLong
;
import
static
org
.
mockito
.
ArgumentMatchers
.
eq
;
import
static
org
.
mockito
.
Mockito
.
never
;
import
static
org
.
mockito
.
Mockito
.
verify
;
import
static
org
.
mockito
.
Mockito
.
when
;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
TopicConfigManagerTest
{
@Mock
private
DefaultMessageStore
messageStore
;
@Mock
private
BrokerController
brokerController
;
private
TopicConfigManager
topicConfigManager
;
private
static
final
String
topic
=
"FooBar"
;
private
static
final
String
broker1Name
=
"broker1"
;
private
static
final
String
broker1Addr
=
"127.0.0.1:12345"
;
private
static
final
int
queueId1
=
1
;
private
static
final
String
broker2Name
=
"broker2"
;
private
static
final
String
broker2Addr
=
"127.0.0.2:12345"
;
private
static
final
int
queueId2
=
2
;
@Before
public
void
before
()
{
BrokerConfig
brokerConfig
=
new
BrokerConfig
();
brokerConfig
.
setBrokerName
(
broker1Name
);
when
(
brokerController
.
getBrokerConfig
()).
thenReturn
(
brokerConfig
);
when
(
brokerController
.
getMessageStore
()).
thenReturn
(
messageStore
);
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setStorePathRootDir
(
System
.
getProperty
(
"java.io.tmpdir"
));
when
(
brokerController
.
getMessageStoreConfig
()).
thenReturn
(
messageStoreConfig
);
this
.
topicConfigManager
=
new
TopicConfigManager
(
brokerController
);
this
.
topicConfigManager
.
getTopicConfigTable
().
put
(
topic
,
new
TopicConfig
(
topic
));
}
@After
public
void
after
()
throws
Exception
{
if
(
topicConfigManager
!=
null
)
{
Files
.
deleteIfExists
(
Paths
.
get
(
topicConfigManager
.
configFilePath
()));
}
}
@Test
public
void
logicalQueueCleanTest
()
{
LogicalQueuesInfoInBroker
info
=
this
.
topicConfigManager
.
getOrCreateLogicalQueuesInfo
(
topic
);
topicConfigManager
.
logicalQueueClean
();
assertThat
(
info
).
isEmpty
();
final
int
logicalQueueIndex
=
0
;
LogicalQueueRouteData
queueRouteData1
=
new
LogicalQueueRouteData
(
logicalQueueIndex
,
0
,
new
MessageQueue
(
topic
,
broker1Name
,
queueId1
),
MessageQueueRouteState
.
Normal
,
0
,
-
1
,
-
1
,
-
1
,
broker1Addr
);
List
<
LogicalQueueRouteData
>
l
=
Lists
.
newArrayList
(
new
LogicalQueueRouteData
(
queueRouteData1
));
info
.
put
(
logicalQueueIndex
,
l
);
topicConfigManager
.
logicalQueueClean
();
assertThat
(
info
.
get
(
logicalQueueIndex
)).
isEqualTo
(
Collections
.
singletonList
(
queueRouteData1
));
verify
(
messageStore
,
never
()).
getCommitLogOffsetInQueue
(
eq
(
topic
),
eq
(
queueId1
),
anyLong
());
verify
(
messageStore
,
never
()).
getMinPhyOffset
();
verify
(
brokerController
,
never
()).
registerIncrementBrokerData
(
ArgumentMatchers
.<
TopicConfig
>
argThat
(
arg
->
topic
.
equals
(
arg
.
getTopicName
())),
any
(
DataVersion
.
class
));
LogicalQueueRouteData
queueRouteData2
=
new
LogicalQueueRouteData
(
logicalQueueIndex
,
100
,
new
MessageQueue
(
topic
,
broker2Name
,
queueId2
),
MessageQueueRouteState
.
Normal
,
0
,
-
1
,
-
1
,
-
1
,
broker2Addr
);
l
.
add
(
new
LogicalQueueRouteData
(
queueRouteData2
));
queueRouteData1
=
l
.
get
(
0
);
queueRouteData1
.
setState
(
MessageQueueRouteState
.
ReadOnly
);
queueRouteData1
.
setOffsetMax
(
100
);
queueRouteData1
.
setFirstMsgTimeMillis
(
200
);
queueRouteData1
.
setLastMsgTimeMillis
(
300
);
queueRouteData1
=
new
LogicalQueueRouteData
(
queueRouteData1
);
LogicalQueueRouteData
queueRouteData3
=
new
LogicalQueueRouteData
(
logicalQueueIndex
,
200
,
new
MessageQueue
(
topic
,
broker1Name
,
queueId1
),
MessageQueueRouteState
.
Normal
,
100
,
-
1
,
-
1
,
-
1
,
broker1Addr
);
l
.
add
(
new
LogicalQueueRouteData
(
queueRouteData3
));
queueRouteData2
=
l
.
get
(
1
);
queueRouteData2
.
setState
(
MessageQueueRouteState
.
ReadOnly
);
queueRouteData2
.
setOffsetMax
(
100
);
queueRouteData2
.
setFirstMsgTimeMillis
(
400
);
queueRouteData2
.
setLastMsgTimeMillis
(
500
);
queueRouteData2
=
new
LogicalQueueRouteData
(
queueRouteData2
);
when
(
messageStore
.
getCommitLogOffsetInQueue
(
eq
(
topic
),
eq
(
queueId1
),
eq
(
queueRouteData1
.
getOffsetMax
()
-
1
))).
thenReturn
(
1000L
);
when
(
messageStore
.
getMinPhyOffset
()).
thenReturn
(
0L
);
topicConfigManager
.
logicalQueueClean
();
assertThat
(
info
.
get
(
logicalQueueIndex
)).
isEqualTo
(
Arrays
.
asList
(
queueRouteData1
,
queueRouteData2
,
queueRouteData3
));
verify
(
messageStore
).
getCommitLogOffsetInQueue
(
eq
(
topic
),
eq
(
queueId1
),
eq
(
queueRouteData1
.
getOffsetMax
()
-
1
));
verify
(
messageStore
).
getMinPhyOffset
();
verify
(
brokerController
,
never
()).
registerIncrementBrokerData
(
ArgumentMatchers
.<
TopicConfig
>
argThat
(
arg
->
topic
.
equals
(
arg
.
getTopicName
())),
any
(
DataVersion
.
class
));
when
(
messageStore
.
getMinPhyOffset
()).
thenReturn
(
2000L
);
topicConfigManager
.
logicalQueueClean
();
assertThat
(
info
.
get
(
logicalQueueIndex
)).
isEqualTo
(
Collections
.
singletonList
(
queueRouteData3
));
verify
(
brokerController
).
registerIncrementBrokerData
(
ArgumentMatchers
.<
TopicConfig
>
argThat
(
arg
->
topic
.
equals
(
arg
.
getTopicName
())),
any
(
DataVersion
.
class
));
}
}
\ No newline at end of file
This diff is collapsed.
Click to expand it.
srvutil/src/main/java/org/apache/rocketmq/srvutil/ConcurrentHashMapUtil.java
0 → 100644
浏览文件 @
238f9bcc
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.srvutil
;
import
java.util.concurrent.ConcurrentMap
;
import
java.util.function.Function
;
public
class
ConcurrentHashMapUtil
{
private
static
final
boolean
IS_JDK8
;
static
{
// Java 8 or lower: 1.6.0_23, 1.7.0, 1.7.0_80, 1.8.0_211
// Java 9 or higher: 9.0.1, 11.0.4, 12, 12.0.1
IS_JDK8
=
System
.
getProperty
(
"java.version"
).
startsWith
(
"1.8."
);
}
private
ConcurrentHashMapUtil
()
{
}
/**
* A temporary workaround for Java 8 specific performance issue JDK-8161372 .<br> Use implementation of
* ConcurrentMap.computeIfAbsent instead.
*
* @see <a href="https://bugs.openjdk.java.net/browse/JDK-8161372">https://bugs.openjdk.java.net/browse/JDK-8161372</a>
*/
public
static
<
K
,
V
>
V
computeIfAbsent
(
ConcurrentMap
<
K
,
V
>
map
,
K
key
,
Function
<?
super
K
,
?
extends
V
>
func
)
{
if
(
IS_JDK8
)
{
V
v
,
newValue
;
return
((
v
=
map
.
get
(
key
))
==
null
&&
(
newValue
=
func
.
apply
(
key
))
!=
null
&&
(
v
=
map
.
putIfAbsent
(
key
,
newValue
))
==
null
)
?
newValue
:
v
;
}
else
{
return
map
.
computeIfAbsent
(
key
,
func
);
}
}
}
This diff is collapsed.
Click to expand it.
store/src/main/java/org/apache/rocketmq/store/CleanFilesHook.java
0 → 100644
浏览文件 @
238f9bcc
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.store
;
public
interface
CleanFilesHook
{
void
execute
(
DefaultMessageStore
defaultMessageStore
,
long
deleteCount
);
String
getName
();
}
This diff is collapsed.
Click to expand it.
store/src/main/java/org/apache/rocketmq/store/CommitLog.java
浏览文件 @
238f9bcc
...
...
@@ -140,7 +140,17 @@ public class CommitLog {
final
long
intervalForcibly
,
final
boolean
cleanImmediately
)
{
return
this
.
mappedFileQueue
.
deleteExpiredFileByTime
(
expiredTime
,
deleteFilesInterval
,
intervalForcibly
,
cleanImmediately
);
return
deleteExpiredFile
(
expiredTime
,
deleteFilesInterval
,
intervalForcibly
,
cleanImmediately
,
0
);
}
public
int
deleteExpiredFile
(
final
long
expiredTime
,
final
int
deleteFilesInterval
,
final
long
intervalForcibly
,
final
boolean
cleanImmediately
,
final
int
deleteFileBatchMax
)
{
return
this
.
mappedFileQueue
.
deleteExpiredFileByTime
(
expiredTime
,
deleteFilesInterval
,
intervalForcibly
,
cleanImmediately
,
deleteFileBatchMax
);
}
/**
...
...
This diff is collapsed.
Click to expand it.
store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
浏览文件 @
238f9bcc
...
...
@@ -28,12 +28,14 @@ import java.util.Collections;
import
java.util.HashMap
;
import
java.util.Iterator
;
import
java.util.LinkedList
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map.Entry
;
import
java.util.Set
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.ConcurrentMap
;
import
java.util.concurrent.CopyOnWriteArrayList
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
...
...
@@ -118,6 +120,8 @@ public class DefaultMessageStore implements MessageStore {
private
final
ScheduledExecutorService
diskCheckScheduledExecutorService
=
Executors
.
newSingleThreadScheduledExecutor
(
new
ThreadFactoryImpl
(
"DiskCheckScheduledThread"
));
private
final
List
<
CleanFilesHook
>
cleanFilesHooks
=
new
CopyOnWriteArrayList
<>();
public
DefaultMessageStore
(
final
MessageStoreConfig
messageStoreConfig
,
final
BrokerStatsManager
brokerStatsManager
,
final
MessageArrivingListener
messageArrivingListener
,
final
BrokerConfig
brokerConfig
)
throws
IOException
{
this
.
messageArrivingListener
=
messageArrivingListener
;
...
...
@@ -720,10 +724,20 @@ public class DefaultMessageStore implements MessageStore {
}
public
long
getMaxOffsetInQueue
(
String
topic
,
int
queueId
)
{
ConsumeQueue
logic
=
this
.
findConsumeQueue
(
topic
,
queueId
);
if
(
logic
!=
null
)
{
long
offset
=
logic
.
getMaxOffsetInQueue
();
return
offset
;
return
getMaxOffsetInQueue
(
topic
,
queueId
,
true
);
}
public
long
getMaxOffsetInQueue
(
String
topic
,
int
queueId
,
boolean
committed
)
{
if
(
committed
)
{
ConsumeQueue
logic
=
this
.
findConsumeQueue
(
topic
,
queueId
);
if
(
logic
!=
null
)
{
return
logic
.
getMaxOffsetInQueue
();
}
}
else
{
Long
offset
=
this
.
commitLog
.
getTopicQueueTable
().
get
(
topic
+
"-"
+
queueId
);
if
(
offset
!=
null
)
{
return
offset
;
}
}
return
0
;
...
...
@@ -1301,12 +1315,23 @@ public class DefaultMessageStore implements MessageStore {
log
.
info
(
fileName
+
(
result
?
" create OK"
:
" already exists"
));
}
public
void
registerCleanFileHook
(
CleanFilesHook
hook
)
{
this
.
cleanFilesHooks
.
add
(
hook
);
}
private
void
addScheduleTask
()
{
this
.
scheduledExecutorService
.
scheduleAtFixedRate
(
new
Runnable
()
{
@Override
public
void
run
()
{
DefaultMessageStore
.
this
.
cleanFilesPeriodically
();
long
deleteCount
=
DefaultMessageStore
.
this
.
cleanFilesPeriodically
();
DefaultMessageStore
.
this
.
cleanFilesHooks
.
forEach
(
hook
->
{
try
{
hook
.
execute
(
DefaultMessageStore
.
this
,
deleteCount
);
}
catch
(
Throwable
t
)
{
log
.
error
(
"execute CleanFilesHook[{}] error"
,
hook
.
getName
(),
t
);
}
});
}
},
1000
*
60
,
this
.
messageStoreConfig
.
getCleanResourceInterval
(),
TimeUnit
.
MILLISECONDS
);
...
...
@@ -1351,9 +1376,11 @@ public class DefaultMessageStore implements MessageStore {
},
1000L
,
10000L
,
TimeUnit
.
MILLISECONDS
);
}
private
void
cleanFilesPeriodically
()
{
this
.
cleanCommitLogService
.
run
();
this
.
cleanConsumeQueueService
.
run
();
private
long
cleanFilesPeriodically
()
{
long
deleteCount
=
0L
;
deleteCount
+=
this
.
cleanCommitLogService
.
run
();
deleteCount
+=
this
.
cleanConsumeQueueService
.
run
();
return
deleteCount
;
}
private
void
checkSelf
()
{
...
...
@@ -1611,17 +1638,19 @@ public class DefaultMessageStore implements MessageStore {
DefaultMessageStore
.
log
.
info
(
"executeDeleteFilesManually was invoked"
);
}
public
void
run
()
{
public
long
run
()
{
int
deleteCount
=
0
;
try
{
this
.
deleteExpiredFiles
();
deleteCount
=
this
.
deleteExpiredFiles
();
this
.
redeleteHangedFile
();
}
catch
(
Throwable
e
)
{
DefaultMessageStore
.
log
.
warn
(
this
.
getServiceName
()
+
" service has exception. "
,
e
);
}
return
deleteCount
;
}
private
void
deleteExpiredFiles
()
{
private
int
deleteExpiredFiles
()
{
int
deleteCount
=
0
;
long
fileReservedTime
=
DefaultMessageStore
.
this
.
getMessageStoreConfig
().
getFileReservedTime
();
int
deletePhysicFilesInterval
=
DefaultMessageStore
.
this
.
getMessageStoreConfig
().
getDeleteCommitLogFilesInterval
();
...
...
@@ -1654,6 +1683,7 @@ public class DefaultMessageStore implements MessageStore {
log
.
warn
(
"disk space will be full soon, but delete file failed."
);
}
}
return
deleteCount
;
}
private
void
redeleteHangedFile
()
{
...
...
@@ -1775,17 +1805,20 @@ public class DefaultMessageStore implements MessageStore {
class
CleanConsumeQueueService
{
private
long
lastPhysicalMinOffset
=
0
;
public
void
run
()
{
public
long
run
()
{
long
deleteCount
=
0
;
try
{
this
.
deleteExpiredFiles
();
deleteCount
=
this
.
deleteExpiredFiles
();
}
catch
(
Throwable
e
)
{
DefaultMessageStore
.
log
.
warn
(
this
.
getServiceName
()
+
" service has exception. "
,
e
);
}
return
deleteCount
;
}
private
void
deleteExpiredFiles
()
{
private
long
deleteExpiredFiles
()
{
int
deleteLogicsFilesInterval
=
DefaultMessageStore
.
this
.
getMessageStoreConfig
().
getDeleteConsumeQueueFilesInterval
();
long
deleteCountSum
=
0L
;
long
minOffset
=
DefaultMessageStore
.
this
.
commitLog
.
getMinOffset
();
if
(
minOffset
>
this
.
lastPhysicalMinOffset
)
{
this
.
lastPhysicalMinOffset
=
minOffset
;
...
...
@@ -1795,7 +1828,7 @@ public class DefaultMessageStore implements MessageStore {
for
(
ConcurrentMap
<
Integer
,
ConsumeQueue
>
maps
:
tables
.
values
())
{
for
(
ConsumeQueue
logic
:
maps
.
values
())
{
int
deleteCount
=
logic
.
deleteExpiredFile
(
minOffset
);
deleteCountSum
+=
deleteCount
;
if
(
deleteCount
>
0
&&
deleteLogicsFilesInterval
>
0
)
{
try
{
Thread
.
sleep
(
deleteLogicsFilesInterval
);
...
...
@@ -1807,6 +1840,7 @@ public class DefaultMessageStore implements MessageStore {
DefaultMessageStore
.
this
.
indexService
.
deleteExpiredFile
(
minOffset
);
}
return
deleteCountSum
;
}
public
String
getServiceName
()
{
...
...
This diff is collapsed.
Click to expand it.
store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
浏览文件 @
238f9bcc
...
...
@@ -56,8 +56,8 @@ public class MappedFileQueue {
}
public
void
checkSelf
()
{
if
(!
this
.
mappedFiles
.
isEmpty
())
{
List
<
MappedFile
>
mappedFiles
=
new
ArrayList
<>(
this
.
mappedFiles
);
if
(!
mappedFiles
.
isEmpty
())
{
Iterator
<
MappedFile
>
iterator
=
mappedFiles
.
iterator
();
MappedFile
pre
=
null
;
while
(
iterator
.
hasNext
())
{
...
...
@@ -238,21 +238,8 @@ public class MappedFileQueue {
}
public
MappedFile
getLastMappedFile
()
{
MappedFile
mappedFileLast
=
null
;
while
(!
this
.
mappedFiles
.
isEmpty
())
{
try
{
mappedFileLast
=
this
.
mappedFiles
.
get
(
this
.
mappedFiles
.
size
()
-
1
);
break
;
}
catch
(
IndexOutOfBoundsException
e
)
{
//continue;
}
catch
(
Exception
e
)
{
log
.
error
(
"getLastMappedFile has exception."
,
e
);
break
;
}
}
return
mappedFileLast
;
MappedFile
[]
mappedFiles
=
this
.
mappedFiles
.
toArray
(
new
MappedFile
[
0
]);
return
mappedFiles
.
length
==
0
?
null
:
mappedFiles
[
mappedFiles
.
length
-
1
];
}
public
boolean
resetOffset
(
long
offset
)
{
...
...
@@ -336,7 +323,11 @@ public class MappedFileQueue {
public
int
deleteExpiredFileByTime
(
final
long
expiredTime
,
final
int
deleteFilesInterval
,
final
long
intervalForcibly
,
final
boolean
cleanImmediately
)
{
final
boolean
cleanImmediately
,
int
deleteFileBatchMax
)
{
if
(
deleteFileBatchMax
==
0
)
{
deleteFileBatchMax
=
DELETE_FILES_BATCH_MAX
;
}
Object
[]
mfs
=
this
.
copyMappedFiles
(
0
);
if
(
null
==
mfs
)
...
...
@@ -354,7 +345,7 @@ public class MappedFileQueue {
files
.
add
(
mappedFile
);
deleteCount
++;
if
(
files
.
size
()
>=
DELETE_FILES_BATCH_MAX
)
{
if
(
files
.
size
()
>=
deleteFileBatchMax
)
{
break
;
}
...
...
This diff is collapsed.
Click to expand it.
store/src/main/java/org/apache/rocketmq/store/MessageStore.java
浏览文件 @
238f9bcc
...
...
@@ -115,6 +115,16 @@ public interface MessageStore {
*/
long
getMaxOffsetInQueue
(
final
String
topic
,
final
int
queueId
);
/**
* Get maximum offset of the topic queue.
*
* @param topic Topic name.
* @param queueId Queue ID.
* @param committed If only count committed
* @return Maximum offset at present.
*/
long
getMaxOffsetInQueue
(
final
String
topic
,
final
int
queueId
,
final
boolean
committed
);
/**
* Get the minimum offset of the topic queue.
*
...
...
This diff is collapsed.
Click to expand it.
store/src/test/java/org/apache/rocketmq/store/MappedFileQueueTest.java
浏览文件 @
238f9bcc
...
...
@@ -225,7 +225,7 @@ public class MappedFileQueueTest {
mappedFile
.
getFile
().
setLastModified
(
System
.
currentTimeMillis
()
-
expiredTime
*
2
);
}
}
mappedFileQueue
.
deleteExpiredFileByTime
(
expiredTime
,
0
,
0
,
false
);
mappedFileQueue
.
deleteExpiredFileByTime
(
expiredTime
,
0
,
0
,
false
,
Integer
.
MAX_VALUE
);
assertThat
(
mappedFileQueue
.
getMappedFiles
().
size
()).
isEqualTo
(
45
);
}
...
...
This diff is collapsed.
Click to expand it.
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录
反馈
建议
客服
返回
顶部