Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
ced6b023
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
267
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
ced6b023
编写于
3月 09, 2021
作者:
A
ayanamist
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[RIP-19] Pop Consuming (broker)
上级
ea36854b
变更
26
展开全部
隐藏空白更改
内联
并排
Showing
26 changed file
with
4616 addition
and
31 deletion
+4616
-31
broker/pom.xml
broker/pom.xml
+4
-0
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
...ain/java/org/apache/rocketmq/broker/BrokerController.java
+134
-20
broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
...va/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
+8
-0
broker/src/main/java/org/apache/rocketmq/broker/loadbalance/AssignmentManager.java
...apache/rocketmq/broker/loadbalance/AssignmentManager.java
+149
-0
broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java
...ocketmq/broker/loadbalance/MessageRequestModeManager.java
+99
-0
broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
...tmq/broker/longpolling/NotifyMessageArrivingListener.java
+6
-3
broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java
...va/org/apache/rocketmq/broker/longpolling/PopRequest.java
+87
-0
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
...ache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+426
-0
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
...n/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
+40
-2
broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
...apache/rocketmq/broker/processor/AckMessageProcessor.java
+188
-0
broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
...cketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+195
-0
broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
...ache/rocketmq/broker/processor/PopBufferMergeService.java
+731
-0
broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
...apache/rocketmq/broker/processor/PopMessageProcessor.java
+967
-0
broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
...rg/apache/rocketmq/broker/processor/PopReviveService.java
+461
-0
broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
...e/rocketmq/broker/processor/QueryAssignmentProcessor.java
+307
-0
broker/src/main/java/org/apache/rocketmq/broker/util/MsgUtil.java
...rc/main/java/org/apache/rocketmq/broker/util/MsgUtil.java
+34
-0
broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java
...he/rocketmq/broker/processor/AckMessageProcessorTest.java
+132
-0
broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
...mq/broker/processor/ChangeInvisibleTimeProcessorTest.java
+133
-0
broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
.../rocketmq/broker/processor/PopBufferMergeServiceTest.java
+108
-0
broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
...he/rocketmq/broker/processor/PopMessageProcessorTest.java
+191
-0
broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
...cketmq/broker/processor/QueryAssignmentProcessorTest.java
+153
-0
client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
...ent/consumer/rebalance/AllocateMessageQueueAveragely.java
+10
-2
client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
...umer/rebalance/AllocateMessageQueueAveragelyByCircle.java
+10
-2
distribution/conf/logback_broker.xml
distribution/conf/logback_broker.xml
+29
-0
pom.xml
pom.xml
+6
-1
remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
...a/org/apache/rocketmq/remoting/common/RemotingHelper.java
+8
-1
未找到文件。
broker/pom.xml
浏览文件 @
ced6b023
...
...
@@ -66,6 +66,10 @@
<groupId>
org.slf4j
</groupId>
<artifactId>
slf4j-api
</artifactId>
</dependency>
<dependency>
<groupId>
com.googlecode.concurrentlinkedhashmap
</groupId>
<artifactId>
concurrentlinkedhashmap-lru
</artifactId>
</dependency>
</dependencies>
<build>
...
...
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
浏览文件 @
ced6b023
...
...
@@ -47,19 +47,25 @@ import org.apache.rocketmq.broker.filter.ConsumerFilterManager;
import
org.apache.rocketmq.broker.filtersrv.FilterServerManager
;
import
org.apache.rocketmq.broker.latency.BrokerFastFailure
;
import
org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor
;
import
org.apache.rocketmq.broker.loadbalance.AssignmentManager
;
import
org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener
;
import
org.apache.rocketmq.broker.longpolling.PullRequestHoldService
;
import
org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook
;
import
org.apache.rocketmq.broker.mqtrace.SendMessageHook
;
import
org.apache.rocketmq.broker.offset.ConsumerOffsetManager
;
import
org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager
;
import
org.apache.rocketmq.broker.out.BrokerOuterAPI
;
import
org.apache.rocketmq.broker.plugin.MessageStoreFactory
;
import
org.apache.rocketmq.broker.plugin.MessageStorePluginContext
;
import
org.apache.rocketmq.broker.processor.AckMessageProcessor
;
import
org.apache.rocketmq.broker.processor.AdminBrokerProcessor
;
import
org.apache.rocketmq.broker.processor.ChangeInvisibleTimeProcessor
;
import
org.apache.rocketmq.broker.processor.ClientManageProcessor
;
import
org.apache.rocketmq.broker.processor.ConsumerManageProcessor
;
import
org.apache.rocketmq.broker.processor.EndTransactionProcessor
;
import
org.apache.rocketmq.broker.processor.PopMessageProcessor
;
import
org.apache.rocketmq.broker.processor.PullMessageProcessor
;
import
org.apache.rocketmq.broker.processor.QueryAssignmentProcessor
;
import
org.apache.rocketmq.broker.processor.QueryMessageProcessor
;
import
org.apache.rocketmq.broker.processor.ReplyMessageProcessor
;
import
org.apache.rocketmq.broker.processor.SendMessageProcessor
;
...
...
@@ -118,9 +124,18 @@ public class BrokerController {
private
final
ConsumerOffsetManager
consumerOffsetManager
;
private
final
ConsumerManager
consumerManager
;
private
final
ConsumerFilterManager
consumerFilterManager
;
private
final
ConsumerOrderInfoManager
consumerOrderInfoManager
;
private
final
ProducerManager
producerManager
;
private
final
AssignmentManager
assignmentManager
;
private
final
ClientHousekeepingService
clientHousekeepingService
;
private
final
PullMessageProcessor
pullMessageProcessor
;
private
final
PopMessageProcessor
popMessageProcessor
;
private
final
AckMessageProcessor
ackMessageProcessor
;
private
final
ChangeInvisibleTimeProcessor
changeInvisibleTimeProcessor
;
private
final
QueryAssignmentProcessor
queryAssignmentProcessor
;
private
final
ClientManageProcessor
clientManageProcessor
;
private
final
SendMessageProcessor
sendMessageProcessor
;
private
final
PullRequestHoldService
pullRequestHoldService
;
private
final
MessageArrivingListener
messageArrivingListener
;
private
final
Broker2Client
broker2Client
;
...
...
@@ -132,6 +147,7 @@ public class BrokerController {
"BrokerControllerScheduledThread"
));
private
final
SlaveSynchronize
slaveSynchronize
;
private
final
BlockingQueue
<
Runnable
>
sendThreadPoolQueue
;
private
final
BlockingQueue
<
Runnable
>
ackThreadPoolQueue
;
private
final
BlockingQueue
<
Runnable
>
pullThreadPoolQueue
;
private
final
BlockingQueue
<
Runnable
>
replyThreadPoolQueue
;
private
final
BlockingQueue
<
Runnable
>
queryThreadPoolQueue
;
...
...
@@ -149,12 +165,14 @@ public class BrokerController {
private
TopicConfigManager
topicConfigManager
;
private
ExecutorService
sendMessageExecutor
;
private
ExecutorService
pullMessageExecutor
;
private
ExecutorService
ackMessageExecutor
;
private
ExecutorService
replyMessageExecutor
;
private
ExecutorService
queryMessageExecutor
;
private
ExecutorService
adminBrokerExecutor
;
private
ExecutorService
clientManageExecutor
;
private
ExecutorService
heartbeatExecutor
;
private
ExecutorService
consumerManageExecutor
;
private
ExecutorService
loadBalanceExecutor
;
private
ExecutorService
endTransactionExecutor
;
private
boolean
updateMasterHAServerAddrPeriodically
=
false
;
private
BrokerStats
brokerStats
;
...
...
@@ -167,6 +185,7 @@ public class BrokerController {
private
AbstractTransactionalMessageCheckListener
transactionalMessageCheckListener
;
private
Future
<?>
slaveSyncFuture
;
private
Map
<
Class
,
AccessValidator
>
accessValidatorMap
=
new
HashMap
<
Class
,
AccessValidator
>();
private
long
shouldStartTime
;
public
BrokerController
(
final
BrokerConfig
brokerConfig
,
...
...
@@ -182,10 +201,16 @@ public class BrokerController {
this
.
topicConfigManager
=
new
TopicConfigManager
(
this
);
this
.
pullMessageProcessor
=
new
PullMessageProcessor
(
this
);
this
.
pullRequestHoldService
=
new
PullRequestHoldService
(
this
);
this
.
messageArrivingListener
=
new
NotifyMessageArrivingListener
(
this
.
pullRequestHoldService
);
this
.
popMessageProcessor
=
new
PopMessageProcessor
(
this
);
this
.
ackMessageProcessor
=
new
AckMessageProcessor
(
this
);
this
.
changeInvisibleTimeProcessor
=
new
ChangeInvisibleTimeProcessor
(
this
);
this
.
sendMessageProcessor
=
new
SendMessageProcessor
(
this
);
this
.
messageArrivingListener
=
new
NotifyMessageArrivingListener
(
this
.
pullRequestHoldService
,
this
.
popMessageProcessor
);
this
.
consumerIdsChangeListener
=
new
DefaultConsumerIdsChangeListener
(
this
);
this
.
consumerManager
=
new
ConsumerManager
(
this
.
consumerIdsChangeListener
);
this
.
consumerFilterManager
=
new
ConsumerFilterManager
(
this
);
this
.
consumerOrderInfoManager
=
new
ConsumerOrderInfoManager
(
this
);
this
.
producerManager
=
new
ProducerManager
();
this
.
clientHousekeepingService
=
new
ClientHousekeepingService
(
this
);
this
.
broker2Client
=
new
Broker2Client
(
this
);
...
...
@@ -193,10 +218,14 @@ public class BrokerController {
this
.
brokerOuterAPI
=
new
BrokerOuterAPI
(
nettyClientConfig
);
this
.
filterServerManager
=
new
FilterServerManager
(
this
);
this
.
assignmentManager
=
new
AssignmentManager
(
this
);
this
.
queryAssignmentProcessor
=
new
QueryAssignmentProcessor
(
this
);
this
.
clientManageProcessor
=
new
ClientManageProcessor
(
this
);
this
.
slaveSynchronize
=
new
SlaveSynchronize
(
this
);
this
.
sendThreadPoolQueue
=
new
LinkedBlockingQueue
<
Runnable
>(
this
.
brokerConfig
.
getSendThreadPoolQueueCapacity
());
this
.
pullThreadPoolQueue
=
new
LinkedBlockingQueue
<
Runnable
>(
this
.
brokerConfig
.
getPullThreadPoolQueueCapacity
());
this
.
ackThreadPoolQueue
=
new
LinkedBlockingQueue
<
Runnable
>(
this
.
brokerConfig
.
getAckThreadPoolQueueCapacity
());
this
.
replyThreadPoolQueue
=
new
LinkedBlockingQueue
<
Runnable
>(
this
.
brokerConfig
.
getReplyThreadPoolQueueCapacity
());
this
.
queryThreadPoolQueue
=
new
LinkedBlockingQueue
<
Runnable
>(
this
.
brokerConfig
.
getQueryThreadPoolQueueCapacity
());
this
.
clientManagerThreadPoolQueue
=
new
LinkedBlockingQueue
<
Runnable
>(
this
.
brokerConfig
.
getClientManagerThreadPoolQueueCapacity
());
...
...
@@ -215,6 +244,14 @@ public class BrokerController {
);
}
public
ConsumerIdsChangeListener
getConsumerIdsChangeListener
()
{
return
consumerIdsChangeListener
;
}
public
ClientManageProcessor
getClientManageProcessor
()
{
return
clientManageProcessor
;
}
public
BrokerConfig
getBrokerConfig
()
{
return
brokerConfig
;
}
...
...
@@ -281,6 +318,15 @@ public class BrokerController {
this
.
pullThreadPoolQueue
,
new
ThreadFactoryImpl
(
"PullMessageThread_"
));
this
.
ackMessageExecutor
=
new
BrokerFixedThreadPoolExecutor
(
this
.
brokerConfig
.
getAckMessageThreadPoolNums
(),
this
.
brokerConfig
.
getAckMessageThreadPoolNums
(),
1000
*
60
,
TimeUnit
.
MILLISECONDS
,
this
.
ackThreadPoolQueue
,
new
ThreadFactoryImpl
(
"AckMessageThread_"
));
this
.
replyMessageExecutor
=
new
BrokerFixedThreadPoolExecutor
(
this
.
brokerConfig
.
getProcessReplyMessageThreadPoolNums
(),
this
.
brokerConfig
.
getProcessReplyMessageThreadPoolNums
(),
...
...
@@ -400,6 +446,10 @@ public class BrokerController {
}
},
1000
*
10
,
1000
*
60
,
TimeUnit
.
MILLISECONDS
);
this
.
loadBalanceExecutor
=
Executors
.
newFixedThreadPool
(
this
.
brokerConfig
.
getLoadBalanceProcessorThreadPoolNums
(),
new
ThreadFactoryImpl
(
"LoadBalanceProcessorThread_"
));
if
(
this
.
brokerConfig
.
getNamesrvAddr
()
!=
null
)
{
this
.
brokerOuterAPI
.
updateNameServerAddressList
(
this
.
brokerConfig
.
getNamesrvAddr
());
log
.
info
(
"Set user specified name server address: {}"
,
this
.
brokerConfig
.
getNamesrvAddr
());
...
...
@@ -547,23 +597,38 @@ public class BrokerController {
/**
* SendMessageProcessor
*/
SendMessageProcessor
sendProcessor
=
new
SendMessageProcessor
(
this
);
sendProcessor
.
registerSendMessageHook
(
sendMessageHookList
);
sendProcessor
.
registerConsumeMessageHook
(
consumeMessageHookList
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE_V2
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
SEND_BATCH_MESSAGE
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
CONSUMER_SEND_MSG_BACK
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE_V2
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
SEND_BATCH_MESSAGE
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
CONSUMER_SEND_MSG_BACK
,
sendProcessor
,
this
.
sendMessageExecutor
);
send
Message
Processor
.
registerSendMessageHook
(
sendMessageHookList
);
send
Message
Processor
.
registerConsumeMessageHook
(
consumeMessageHookList
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE
,
send
Message
Processor
,
this
.
sendMessageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE_V2
,
send
Message
Processor
,
this
.
sendMessageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
SEND_BATCH_MESSAGE
,
send
Message
Processor
,
this
.
sendMessageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
CONSUMER_SEND_MSG_BACK
,
send
Message
Processor
,
this
.
sendMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE
,
send
Message
Processor
,
this
.
sendMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE_V2
,
send
Message
Processor
,
this
.
sendMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
SEND_BATCH_MESSAGE
,
send
Message
Processor
,
this
.
sendMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
CONSUMER_SEND_MSG_BACK
,
send
Message
Processor
,
this
.
sendMessageExecutor
);
/**
* PullMessageProcessor
*/
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
PULL_MESSAGE
,
this
.
pullMessageProcessor
,
this
.
pullMessageExecutor
);
this
.
pullMessageProcessor
.
registerConsumeMessageHook
(
consumeMessageHookList
);
/**
* PopMessageProcessor
*/
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
POP_MESSAGE
,
this
.
popMessageProcessor
,
this
.
pullMessageExecutor
);
/**
* AckMessageProcessor
*/
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
ACK_MESSAGE
,
this
.
ackMessageProcessor
,
this
.
ackMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
ACK_MESSAGE
,
this
.
ackMessageProcessor
,
this
.
ackMessageExecutor
);
/**
* ChangeInvisibleTimeProcessor
*/
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
CHANGE_MESSAGE_INVISIBLETIME
,
this
.
changeInvisibleTimeProcessor
,
this
.
ackMessageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
CHANGE_MESSAGE_INVISIBLETIME
,
this
.
changeInvisibleTimeProcessor
,
this
.
ackMessageExecutor
);
/**
* ReplyMessageProcessor
...
...
@@ -589,14 +654,13 @@ public class BrokerController {
/**
* ClientManageProcessor
*/
ClientManageProcessor
clientProcessor
=
new
ClientManageProcessor
(
this
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
HEART_BEAT
,
clientProcessor
,
this
.
heartbeatExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
UNREGISTER_CLIENT
,
clientProcessor
,
this
.
clientManageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
CHECK_CLIENT_CONFIG
,
clientProcessor
,
this
.
clientManageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
HEART_BEAT
,
clientManageProcessor
,
this
.
heartbeatExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
UNREGISTER_CLIENT
,
clientManageProcessor
,
this
.
clientManageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
CHECK_CLIENT_CONFIG
,
clientManageProcessor
,
this
.
clientManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
HEART_BEAT
,
clientProcessor
,
this
.
heartbeatExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
UNREGISTER_CLIENT
,
clientProcessor
,
this
.
clientManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
CHECK_CLIENT_CONFIG
,
clientProcessor
,
this
.
clientManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
HEART_BEAT
,
client
Manage
Processor
,
this
.
heartbeatExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
UNREGISTER_CLIENT
,
client
Manage
Processor
,
this
.
clientManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
CHECK_CLIENT_CONFIG
,
client
Manage
Processor
,
this
.
clientManageExecutor
);
/**
* ConsumerManageProcessor
...
...
@@ -610,6 +674,14 @@ public class BrokerController {
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
UPDATE_CONSUMER_OFFSET
,
consumerManageProcessor
,
this
.
consumerManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
QUERY_CONSUMER_OFFSET
,
consumerManageProcessor
,
this
.
consumerManageExecutor
);
/**
* QueryAssignmentProcessor
*/
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
QUERY_ASSIGNMENT
,
queryAssignmentProcessor
,
loadBalanceExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
QUERY_ASSIGNMENT
,
queryAssignmentProcessor
,
loadBalanceExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
SET_MESSAGE_REQUEST_MODE
,
queryAssignmentProcessor
,
loadBalanceExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
SET_MESSAGE_REQUEST_MODE
,
queryAssignmentProcessor
,
loadBalanceExecutor
);
/**
* EndTransactionProcessor
*/
...
...
@@ -713,6 +785,10 @@ public class BrokerController {
return
consumerFilterManager
;
}
public
ConsumerOrderInfoManager
getConsumerOrderInfoManager
()
{
return
consumerOrderInfoManager
;
}
public
ConsumerOffsetManager
getConsumerOffsetManager
()
{
return
consumerOffsetManager
;
}
...
...
@@ -741,6 +817,10 @@ public class BrokerController {
return
subscriptionGroupManager
;
}
public
PopMessageProcessor
getPopMessageProcessor
()
{
return
popMessageProcessor
;
}
public
void
shutdown
()
{
if
(
this
.
brokerStatsManager
!=
null
)
{
this
.
brokerStatsManager
.
shutdown
();
...
...
@@ -824,6 +904,11 @@ public class BrokerController {
this
.
consumerManageExecutor
.
shutdown
();
}
{
this
.
popMessageProcessor
.
getPopBufferMergeService
().
shutdown
();
this
.
ackMessageProcessor
.
shutdownPopReviveService
();
}
if
(
this
.
fileWatchService
!=
null
)
{
this
.
fileWatchService
.
shutdown
();
}
...
...
@@ -849,6 +934,8 @@ public class BrokerController {
}
public
void
start
()
throws
Exception
{
this
.
shouldStartTime
=
System
.
currentTimeMillis
();
if
(
this
.
messageStore
!=
null
)
{
this
.
messageStore
.
start
();
}
...
...
@@ -857,6 +944,17 @@ public class BrokerController {
this
.
remotingServer
.
start
();
}
{
this
.
popMessageProcessor
.
getPopLongPollingService
().
start
();
this
.
popMessageProcessor
.
getPopBufferMergeService
().
start
();
this
.
popMessageProcessor
.
getQueueLockManager
().
start
();
this
.
ackMessageProcessor
.
startPopReviveService
();
}
{
assignmentManager
.
start
();
}
if
(
this
.
fastRemotingServer
!=
null
)
{
this
.
fastRemotingServer
.
start
();
}
...
...
@@ -1243,4 +1341,20 @@ public class BrokerController {
public
ExecutorService
getSendMessageExecutor
()
{
return
sendMessageExecutor
;
}
public
long
getShouldStartTime
()
{
return
shouldStartTime
;
}
public
AssignmentManager
getAssignmentManager
()
{
return
assignmentManager
;
}
public
SendMessageProcessor
getSendMessageProcessor
()
{
return
sendMessageProcessor
;
}
public
QueryAssignmentProcessor
getQueryAssignmentProcessor
()
{
return
queryAssignmentProcessor
;
}
}
broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
浏览文件 @
ced6b023
...
...
@@ -39,6 +39,10 @@ public class BrokerPathConfigHelper {
return
rootDir
+
File
.
separator
+
"config"
+
File
.
separator
+
"consumerOffset.json"
;
}
public
static
String
getConsumerOrderInfoPath
(
final
String
rootDir
)
{
return
rootDir
+
File
.
separator
+
"config"
+
File
.
separator
+
"consumerOrderInfo.json"
;
}
public
static
String
getSubscriptionGroupPath
(
final
String
rootDir
)
{
return
rootDir
+
File
.
separator
+
"config"
+
File
.
separator
+
"subscriptionGroup.json"
;
}
...
...
@@ -46,4 +50,8 @@ public class BrokerPathConfigHelper {
public
static
String
getConsumerFilterPath
(
final
String
rootDir
)
{
return
rootDir
+
File
.
separator
+
"config"
+
File
.
separator
+
"consumerFilter.json"
;
}
public
static
String
getMessageRequestModePath
(
final
String
rootDir
)
{
return
rootDir
+
File
.
separator
+
"config"
+
File
.
separator
+
"messageRequestMode.json"
;
}
}
broker/src/main/java/org/apache/rocketmq/broker/loadbalance/AssignmentManager.java
0 → 100644
浏览文件 @
ced6b023
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.loadbalance
;
import
com.google.common.collect.Lists
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Set
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.locks.Lock
;
import
java.util.concurrent.locks.ReentrantLock
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.out.BrokerOuterAPI
;
import
org.apache.rocketmq.client.exception.MQBrokerException
;
import
org.apache.rocketmq.client.impl.factory.MQClientInstance
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.ThreadFactoryImpl
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.route.TopicRouteData
;
import
org.apache.rocketmq.common.topic.TopicValidator
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
public
class
AssignmentManager
{
private
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
private
transient
BrokerController
brokerController
;
private
final
static
long
LOCK_TIMEOUT_MILLIS
=
3000
;
private
final
Lock
lockNamesrv
=
new
ReentrantLock
();
private
final
BrokerOuterAPI
mQClientAPIImpl
;
private
final
ConcurrentHashMap
<
String
,
Set
<
MessageQueue
>>
topicSubscribeInfoTable
=
new
ConcurrentHashMap
<
String
,
Set
<
MessageQueue
>>();
private
ScheduledExecutorService
scheduledExecutorService
=
Executors
.
newSingleThreadScheduledExecutor
(
new
ThreadFactoryImpl
(
"LoadBalanceManagerScheduledThread"
));
private
static
final
List
<
String
>
IGNORE_ROUTE_TOPICS
=
Lists
.
newArrayList
(
TopicValidator
.
SYSTEM_TOPIC_PREFIX
,
MixAll
.
CID_RMQ_SYS_PREFIX
,
MixAll
.
DEFAULT_CONSUMER_GROUP
,
MixAll
.
TOOLS_CONSUMER_GROUP
,
MixAll
.
FILTERSRV_CONSUMER_GROUP
,
MixAll
.
MONITOR_CONSUMER_GROUP
,
MixAll
.
ONS_HTTP_PROXY_GROUP
,
MixAll
.
CID_ONSAPI_PERMISSION_GROUP
,
MixAll
.
CID_ONSAPI_OWNER_GROUP
,
MixAll
.
CID_ONSAPI_PULL_GROUP
);
private
final
List
<
String
>
ignoreRouteTopics
=
Lists
.
newArrayList
(
IGNORE_ROUTE_TOPICS
);
public
AssignmentManager
(
BrokerController
brokerController
)
{
this
.
brokerController
=
brokerController
;
this
.
mQClientAPIImpl
=
brokerController
.
getBrokerOuterAPI
();
ignoreRouteTopics
.
add
(
brokerController
.
getBrokerConfig
().
getBrokerClusterName
());
ignoreRouteTopics
.
add
(
brokerController
.
getBrokerConfig
().
getBrokerName
());
}
public
void
start
()
{
this
.
scheduledExecutorService
.
scheduleAtFixedRate
(
new
Runnable
()
{
@Override
public
void
run
()
{
try
{
updateTopicRouteInfoFromNameServer
();
}
catch
(
Exception
e
)
{
log
.
error
(
"ScheduledTask: failed to pull TopicRouteData from NameServer"
,
e
);
}
}
},
13000
,
this
.
brokerController
.
getBrokerConfig
().
getLoadBalancePollNameServerInterval
(),
TimeUnit
.
MILLISECONDS
);
}
public
void
updateTopicRouteInfoFromNameServer
()
{
Set
<
String
>
topicList
=
new
HashSet
<>(
brokerController
.
getTopicConfigManager
().
getTopicConfigTable
().
keySet
());
LOOP:
for
(
String
topic
:
topicList
)
{
for
(
String
keyword
:
ignoreRouteTopics
)
{
if
(
topic
.
contains
(
keyword
))
{
continue
LOOP
;
}
}
this
.
updateTopicRouteInfoFromNameServer
(
topic
);
}
}
public
boolean
updateTopicRouteInfoFromNameServer
(
final
String
topic
)
{
try
{
TopicRouteData
topicRouteData
=
this
.
mQClientAPIImpl
.
getTopicRouteInfoFromNameServer
(
topic
,
1000
*
3
);
if
(
topicRouteData
!=
null
)
{
Set
<
MessageQueue
>
newSubscribeInfo
=
MQClientInstance
.
topicRouteData2TopicSubscribeInfo
(
topic
,
topicRouteData
);
Set
<
MessageQueue
>
oldSubscribeInfo
=
topicSubscribeInfoTable
.
get
(
topic
);
boolean
changed
=
!
newSubscribeInfo
.
equals
(
oldSubscribeInfo
);
if
(
changed
)
{
log
.
info
(
"the topic[{}] subscribe message queue changed, old[{}] ,new[{}]"
,
topic
,
oldSubscribeInfo
,
newSubscribeInfo
);
topicSubscribeInfoTable
.
put
(
topic
,
newSubscribeInfo
);
return
true
;
}
}
else
{
log
.
warn
(
"updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}"
,
topic
);
}
}
catch
(
Exception
e
)
{
if
(!
topic
.
startsWith
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
))
{
log
.
warn
(
"updateTopicRouteInfoFromNameServer Exception"
,
e
);
if
(
e
instanceof
MQBrokerException
&&
ResponseCode
.
TOPIC_NOT_EXIST
==
((
MQBrokerException
)
e
).
getResponseCode
())
{
// clean no used topic
cleanNoneRouteTopic
(
topic
);
}
}
}
return
false
;
}
private
void
cleanNoneRouteTopic
(
String
topic
)
{
// clean no used topic
topicSubscribeInfoTable
.
remove
(
topic
);
}
public
Set
<
MessageQueue
>
getTopicSubscribeInfo
(
String
topic
)
{
return
topicSubscribeInfoTable
.
get
(
topic
);
}
}
broker/src/main/java/org/apache/rocketmq/broker/loadbalance/MessageRequestModeManager.java
0 → 100644
浏览文件 @
ced6b023
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.loadbalance
;
import
java.util.concurrent.ConcurrentHashMap
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.BrokerPathConfigHelper
;
import
org.apache.rocketmq.common.ConfigManager
;
import
org.apache.rocketmq.common.protocol.body.SetMessageRequestModeRequestBody
;
import
org.apache.rocketmq.remoting.protocol.RemotingSerializable
;
public
class
MessageRequestModeManager
extends
ConfigManager
{
private
BrokerController
brokerController
;
private
ConcurrentHashMap
<
String
,
ConcurrentHashMap
<
String
,
SetMessageRequestModeRequestBody
>>
messageRequestModeMap
=
new
ConcurrentHashMap
<
String
,
ConcurrentHashMap
<
String
,
SetMessageRequestModeRequestBody
>>();
public
MessageRequestModeManager
()
{
// empty construct for decode
}
public
MessageRequestModeManager
(
BrokerController
brokerController
)
{
this
.
brokerController
=
brokerController
;
}
public
void
setMessageRequestMode
(
String
topic
,
String
consumerGroup
,
SetMessageRequestModeRequestBody
requestBody
)
{
ConcurrentHashMap
<
String
,
SetMessageRequestModeRequestBody
>
consumerGroup2ModeMap
=
messageRequestModeMap
.
get
(
topic
);
if
(
consumerGroup2ModeMap
==
null
)
{
consumerGroup2ModeMap
=
new
ConcurrentHashMap
<
String
,
SetMessageRequestModeRequestBody
>();
ConcurrentHashMap
<
String
,
SetMessageRequestModeRequestBody
>
pre
=
messageRequestModeMap
.
putIfAbsent
(
topic
,
consumerGroup2ModeMap
);
if
(
pre
!=
null
)
{
consumerGroup2ModeMap
=
pre
;
}
}
consumerGroup2ModeMap
.
put
(
consumerGroup
,
requestBody
);
}
public
SetMessageRequestModeRequestBody
getMessageRequestMode
(
String
topic
,
String
consumerGroup
)
{
ConcurrentHashMap
<
String
,
SetMessageRequestModeRequestBody
>
consumerGroup2ModeMap
=
messageRequestModeMap
.
get
(
topic
);
if
(
consumerGroup2ModeMap
!=
null
)
{
return
consumerGroup2ModeMap
.
get
(
consumerGroup
);
}
return
null
;
}
public
ConcurrentHashMap
<
String
,
ConcurrentHashMap
<
String
,
SetMessageRequestModeRequestBody
>>
getMessageRequestModeMap
()
{
return
this
.
messageRequestModeMap
;
}
public
void
setMessageRequestModeMap
(
ConcurrentHashMap
<
String
,
ConcurrentHashMap
<
String
,
SetMessageRequestModeRequestBody
>>
messageRequestModeMap
)
{
this
.
messageRequestModeMap
=
messageRequestModeMap
;
}
@Override
public
String
encode
()
{
return
this
.
encode
(
false
);
}
@Override
public
String
configFilePath
()
{
return
BrokerPathConfigHelper
.
getMessageRequestModePath
(
this
.
brokerController
.
getMessageStoreConfig
().
getStorePathRootDir
());
}
@Override
public
void
decode
(
String
jsonString
)
{
if
(
jsonString
!=
null
)
{
MessageRequestModeManager
obj
=
RemotingSerializable
.
fromJson
(
jsonString
,
MessageRequestModeManager
.
class
);
if
(
obj
!=
null
)
{
this
.
messageRequestModeMap
=
obj
.
messageRequestModeMap
;
}
}
}
@Override
public
String
encode
(
boolean
prettyFormat
)
{
return
RemotingSerializable
.
toJson
(
this
,
prettyFormat
);
}
}
broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotifyMessageArrivingListener.java
浏览文件 @
ced6b023
...
...
@@ -17,15 +17,17 @@
package
org.apache.rocketmq.broker.longpolling
;
import
org.apache.rocketmq.store.MessageArrivingListener
;
import
java.util.Map
;
import
org.apache.rocketmq.broker.processor.PopMessageProcessor
;
import
org.apache.rocketmq.store.MessageArrivingListener
;
public
class
NotifyMessageArrivingListener
implements
MessageArrivingListener
{
private
final
PullRequestHoldService
pullRequestHoldService
;
private
final
PopMessageProcessor
popMessageProcessor
;
public
NotifyMessageArrivingListener
(
final
PullRequestHoldService
pullRequestHoldService
)
{
public
NotifyMessageArrivingListener
(
final
PullRequestHoldService
pullRequestHoldService
,
final
PopMessageProcessor
popMessageProcessor
)
{
this
.
pullRequestHoldService
=
pullRequestHoldService
;
this
.
popMessageProcessor
=
popMessageProcessor
;
}
@Override
...
...
@@ -33,5 +35,6 @@ public class NotifyMessageArrivingListener implements MessageArrivingListener {
long
msgStoreTime
,
byte
[]
filterBitMap
,
Map
<
String
,
String
>
properties
)
{
this
.
pullRequestHoldService
.
notifyMessageArriving
(
topic
,
queueId
,
logicOffset
,
tagsCode
,
msgStoreTime
,
filterBitMap
,
properties
);
this
.
popMessageProcessor
.
notifyMessageArriving
(
topic
,
queueId
);
}
}
broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopRequest.java
0 → 100644
浏览文件 @
ced6b023
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.longpolling
;
import
io.netty.channel.Channel
;
import
java.util.Comparator
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
java.util.concurrent.atomic.AtomicLong
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
public
class
PopRequest
{
private
static
final
AtomicLong
COUNTER
=
new
AtomicLong
(
Long
.
MIN_VALUE
);
private
RemotingCommand
remotingCommand
;
private
Channel
channel
;
private
long
expired
;
private
AtomicBoolean
complete
=
new
AtomicBoolean
(
false
);
private
final
long
op
=
COUNTER
.
getAndIncrement
();
public
PopRequest
(
RemotingCommand
remotingCommand
,
Channel
channel
,
long
expired
)
{
this
.
channel
=
channel
;
this
.
remotingCommand
=
remotingCommand
;
this
.
expired
=
expired
;
}
public
Channel
getChannel
()
{
return
channel
;
}
public
RemotingCommand
getRemotingCommand
()
{
return
remotingCommand
;
}
public
boolean
isTimeout
()
{
return
System
.
currentTimeMillis
()
>
(
expired
-
50
);
}
public
boolean
complete
()
{
return
complete
.
compareAndSet
(
false
,
true
);
}
public
long
getExpired
()
{
return
expired
;
}
@Override
public
String
toString
()
{
final
StringBuilder
sb
=
new
StringBuilder
(
"PopRequest{"
);
sb
.
append
(
"cmd="
).
append
(
remotingCommand
);
sb
.
append
(
", channel="
).
append
(
channel
);
sb
.
append
(
", expired="
).
append
(
expired
);
sb
.
append
(
", complete="
).
append
(
complete
);
sb
.
append
(
", op="
).
append
(
op
);
sb
.
append
(
'}'
);
return
sb
.
toString
();
}
public
static
final
Comparator
<
PopRequest
>
COMPARATOR
=
new
Comparator
<
PopRequest
>()
{
@Override
public
int
compare
(
PopRequest
o1
,
PopRequest
o2
)
{
int
ret
=
(
int
)
(
o1
.
getExpired
()
-
o2
.
getExpired
());
if
(
ret
!=
0
)
{
return
ret
;
}
ret
=
(
int
)
(
o1
.
op
-
o2
.
op
);
if
(
ret
!=
0
)
{
return
ret
;
}
return
-
1
;
}
};
}
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
0 → 100644
浏览文件 @
ced6b023
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.offset
;
import
com.alibaba.fastjson.annotation.JSONField
;
import
java.util.ArrayList
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.concurrent.ConcurrentHashMap
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.BrokerPathConfigHelper
;
import
org.apache.rocketmq.common.ConfigManager
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.protocol.RemotingSerializable
;
public
class
ConsumerOrderInfoManager
extends
ConfigManager
{
private
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
private
static
final
String
TOPIC_GROUP_SEPARATOR
=
"@"
;
private
static
final
long
CLEAN_SPAN_FROM_LAST
=
24
*
3600
*
1000
;
private
ConcurrentHashMap
<
String
/* topic@group*/
,
ConcurrentHashMap
<
Integer
/*queueId*/
,
OrderInfo
>>
table
=
new
ConcurrentHashMap
<>(
128
);
private
transient
BrokerController
brokerController
;
public
ConsumerOrderInfoManager
()
{
}
public
ConsumerOrderInfoManager
(
BrokerController
brokerController
)
{
this
.
brokerController
=
brokerController
;
}
public
ConcurrentHashMap
<
String
,
ConcurrentHashMap
<
Integer
,
OrderInfo
>>
getTable
()
{
return
table
;
}
public
void
setTable
(
ConcurrentHashMap
<
String
,
ConcurrentHashMap
<
Integer
,
OrderInfo
>>
table
)
{
this
.
table
=
table
;
}
/**
* not thread safe.
*
* @param topic
* @param group
* @param queueId
* @param msgOffsetList
*/
public
int
update
(
String
topic
,
String
group
,
int
queueId
,
List
<
Long
>
msgOffsetList
)
{
String
key
=
topic
+
TOPIC_GROUP_SEPARATOR
+
group
;
ConcurrentHashMap
<
Integer
/*queueId*/
,
OrderInfo
>
qs
=
table
.
get
(
key
);
if
(
qs
==
null
)
{
qs
=
new
ConcurrentHashMap
<>(
16
);
ConcurrentHashMap
<
Integer
/*queueId*/
,
OrderInfo
>
old
=
table
.
putIfAbsent
(
key
,
qs
);
if
(
old
!=
null
)
{
qs
=
old
;
}
}
OrderInfo
orderInfo
=
qs
.
get
(
queueId
);
// start is same.
List
<
Long
>
simple
=
OrderInfo
.
simpleO
(
msgOffsetList
);
if
(
orderInfo
!=
null
&&
simple
.
get
(
0
).
equals
(
orderInfo
.
getOffsetList
().
get
(
0
)))
{
if
(
simple
.
equals
(
orderInfo
.
getOffsetList
()))
{
orderInfo
.
setConsumedCount
(
orderInfo
.
getConsumedCount
()
+
1
);
}
else
{
// reset, because msgs are changed.
orderInfo
.
setConsumedCount
(
0
);
}
orderInfo
.
setLastConsumeTimestamp
(
System
.
currentTimeMillis
());
orderInfo
.
setOffsetList
(
simple
);
orderInfo
.
setCommitOffsetBit
(
0
);
}
else
{
orderInfo
=
new
OrderInfo
();
orderInfo
.
setOffsetList
(
simple
);
orderInfo
.
setLastConsumeTimestamp
(
System
.
currentTimeMillis
());
orderInfo
.
setConsumedCount
(
0
);
orderInfo
.
setCommitOffsetBit
(
0
);
qs
.
put
(
queueId
,
orderInfo
);
}
return
orderInfo
.
getConsumedCount
();
}
public
boolean
checkBlock
(
String
topic
,
String
group
,
int
queueId
,
long
invisibleTime
)
{
String
key
=
topic
+
TOPIC_GROUP_SEPARATOR
+
group
;
ConcurrentHashMap
<
Integer
/*queueId*/
,
OrderInfo
>
qs
=
table
.
get
(
key
);
if
(
qs
==
null
)
{
qs
=
new
ConcurrentHashMap
<>(
16
);
ConcurrentHashMap
<
Integer
/*queueId*/
,
OrderInfo
>
old
=
table
.
putIfAbsent
(
key
,
qs
);
if
(
old
!=
null
)
{
qs
=
old
;
}
}
OrderInfo
orderInfo
=
qs
.
get
(
queueId
);
if
(
orderInfo
==
null
)
{
return
false
;
}
boolean
isBlock
=
System
.
currentTimeMillis
()
-
orderInfo
.
getLastConsumeTimestamp
()
<
invisibleTime
;
return
isBlock
&&
!
orderInfo
.
isDone
();
}
/**
* @param topic
* @param group
* @param queueId
* @param offset
* @return -1 : illegal, -2 : no need commit, >= 0 : commit
*/
public
long
commitAndNext
(
String
topic
,
String
group
,
int
queueId
,
long
offset
)
{
String
key
=
topic
+
TOPIC_GROUP_SEPARATOR
+
group
;
ConcurrentHashMap
<
Integer
/*queueId*/
,
OrderInfo
>
qs
=
table
.
get
(
key
);
if
(
qs
==
null
)
{
return
offset
+
1
;
}
OrderInfo
orderInfo
=
qs
.
get
(
queueId
);
if
(
orderInfo
==
null
)
{
log
.
warn
(
"OrderInfo is null, {}, {}, {}"
,
key
,
offset
,
orderInfo
);
return
offset
+
1
;
}
List
<
Long
>
offsetList
=
orderInfo
.
getOffsetList
();
if
(
offsetList
==
null
||
offsetList
.
isEmpty
())
{
log
.
warn
(
"OrderInfo is empty, {}, {}, {}"
,
key
,
offset
,
orderInfo
);
return
-
1
;
}
Long
first
=
offsetList
.
get
(
0
);
int
i
=
0
,
size
=
offsetList
.
size
();
for
(;
i
<
size
;
i
++)
{
long
temp
;
if
(
i
==
0
)
{
temp
=
first
;
}
else
{
temp
=
first
+
offsetList
.
get
(
i
);
}
if
(
offset
==
temp
)
{
break
;
}
}
// not found
if
(
i
>=
size
)
{
log
.
warn
(
"OrderInfo not found commit offset, {}, {}, {}"
,
key
,
offset
,
orderInfo
);
return
-
1
;
}
//set bit
orderInfo
.
setCommitOffsetBit
(
orderInfo
.
getCommitOffsetBit
()
|
(
1L
<<
i
));
if
(
orderInfo
.
isDone
())
{
if
(
size
==
1
)
{
return
offsetList
.
get
(
0
)
+
1
;
}
else
{
return
offsetList
.
get
(
size
-
1
)
+
first
+
1
;
}
}
return
-
2
;
}
public
OrderInfo
get
(
String
topic
,
String
group
,
int
queueId
)
{
String
key
=
topic
+
TOPIC_GROUP_SEPARATOR
+
group
;
ConcurrentHashMap
<
Integer
/*queueId*/
,
OrderInfo
>
qs
=
table
.
get
(
key
);
if
(
qs
==
null
)
{
return
null
;
}
return
qs
.
get
(
queueId
);
}
public
int
getConsumeCount
(
String
topic
,
String
group
,
int
queueId
)
{
OrderInfo
orderInfo
=
get
(
topic
,
group
,
queueId
);
return
orderInfo
==
null
?
0
:
orderInfo
.
getConsumedCount
();
}
private
void
autoClean
()
{
if
(
brokerController
==
null
)
{
return
;
}
Iterator
<
Map
.
Entry
<
String
/* topic@group*/
,
ConcurrentHashMap
<
Integer
/*queueId*/
,
OrderInfo
>>>
iterator
=
this
.
table
.
entrySet
().
iterator
();
while
(
iterator
.
hasNext
())
{
Map
.
Entry
<
String
/* topic@group*/
,
ConcurrentHashMap
<
Integer
/*queueId*/
,
OrderInfo
>>
entry
=
iterator
.
next
();
String
topicAtGroup
=
entry
.
getKey
();
ConcurrentHashMap
<
Integer
/*queueId*/
,
OrderInfo
>
qs
=
entry
.
getValue
();
String
[]
arrays
=
topicAtGroup
.
split
(
TOPIC_GROUP_SEPARATOR
);
if
(
arrays
.
length
!=
2
)
{
continue
;
}
String
topic
=
arrays
[
0
];
String
group
=
arrays
[
1
];
TopicConfig
topicConfig
=
this
.
brokerController
.
getTopicConfigManager
().
selectTopicConfig
(
topic
);
if
(
topicConfig
==
null
)
{
iterator
.
remove
();
log
.
info
(
"Topic not exist, Clean order info, {}:{}"
,
topicAtGroup
,
qs
);
continue
;
}
if
(
this
.
brokerController
.
getSubscriptionGroupManager
().
getSubscriptionGroupTable
().
get
(
group
)
==
null
)
{
iterator
.
remove
();
log
.
info
(
"Group not exist, Clean order info, {}:{}"
,
topicAtGroup
,
qs
);
continue
;
}
if
(
qs
.
isEmpty
())
{
iterator
.
remove
();
log
.
info
(
"Order table is empty, Clean order info, {}:{}"
,
topicAtGroup
,
qs
);
continue
;
}
Iterator
<
Map
.
Entry
<
Integer
/*queueId*/
,
OrderInfo
>>
qsIterator
=
qs
.
entrySet
().
iterator
();
while
(
qsIterator
.
hasNext
())
{
Map
.
Entry
<
Integer
/*queueId*/
,
OrderInfo
>
qsEntry
=
qsIterator
.
next
();
if
(
qsEntry
.
getKey
()
>=
topicConfig
.
getReadQueueNums
())
{
qsIterator
.
remove
();
log
.
info
(
"Queue not exist, Clean order info, {}:{}, {}"
,
topicAtGroup
,
entry
.
getValue
(),
topicConfig
);
continue
;
}
if
(
System
.
currentTimeMillis
()
-
qsEntry
.
getValue
().
getLastConsumeTimestamp
()
>
CLEAN_SPAN_FROM_LAST
)
{
qsIterator
.
remove
();
log
.
info
(
"Not consume long time, Clean order info, {}:{}, {}"
,
topicAtGroup
,
entry
.
getValue
(),
topicConfig
);
continue
;
}
}
}
}
@Override
public
String
encode
()
{
return
this
.
encode
(
false
);
}
@Override
public
String
configFilePath
()
{
if
(
brokerController
!=
null
)
{
return
BrokerPathConfigHelper
.
getConsumerOrderInfoPath
(
this
.
brokerController
.
getMessageStoreConfig
().
getStorePathRootDir
());
}
else
{
return
BrokerPathConfigHelper
.
getConsumerOrderInfoPath
(
"~"
);
}
}
@Override
public
void
decode
(
String
jsonString
)
{
if
(
jsonString
!=
null
)
{
ConsumerOrderInfoManager
obj
=
RemotingSerializable
.
fromJson
(
jsonString
,
ConsumerOrderInfoManager
.
class
);
if
(
obj
!=
null
)
{
this
.
table
=
obj
.
table
;
}
}
}
@Override
public
String
encode
(
boolean
prettyFormat
)
{
this
.
autoClean
();
StringBuilder
stringBuilder
=
new
StringBuilder
();
stringBuilder
.
append
(
"{\n"
).
append
(
"\t\"table\":{"
);
Iterator
<
Map
.
Entry
<
String
/* topic@group*/
,
ConcurrentHashMap
<
Integer
/*queueId*/
,
OrderInfo
>>>
iterator
=
this
.
table
.
entrySet
().
iterator
();
int
count1
=
0
;
while
(
iterator
.
hasNext
())
{
Map
.
Entry
<
String
/* topic@group*/
,
ConcurrentHashMap
<
Integer
/*queueId*/
,
OrderInfo
>>
entry
=
iterator
.
next
();
if
(
count1
>
0
)
{
stringBuilder
.
append
(
","
);
}
stringBuilder
.
append
(
"\n\t\t\""
).
append
(
entry
.
getKey
()).
append
(
"\":{"
);
Iterator
<
Map
.
Entry
<
Integer
/*queueId*/
,
OrderInfo
>>
qsIterator
=
entry
.
getValue
().
entrySet
().
iterator
();
int
count2
=
0
;
while
(
qsIterator
.
hasNext
())
{
Map
.
Entry
<
Integer
/*queueId*/
,
OrderInfo
>
qsEntry
=
qsIterator
.
next
();
if
(
count2
>
0
)
{
stringBuilder
.
append
(
","
);
}
stringBuilder
.
append
(
"\n\t\t\t"
).
append
(
qsEntry
.
getKey
()).
append
(
":"
)
.
append
(
qsEntry
.
getValue
().
encode
());
count2
++;
}
stringBuilder
.
append
(
"\n\t\t}"
);
count1
++;
}
stringBuilder
.
append
(
"\n\t}"
).
append
(
"\n}"
);
return
stringBuilder
.
toString
();
}
public
static
class
OrderInfo
{
/**
* offset
*/
private
List
<
Long
>
offsetList
;
/**
* consumed count
*/
private
int
consumedCount
;
/**
* last consume timestamp
*/
private
long
lastConsumeTimestamp
;
/**
* commit offset bit
*/
private
long
commitOffsetBit
;
public
OrderInfo
()
{
}
public
List
<
Long
>
getOffsetList
()
{
return
offsetList
;
}
public
void
setOffsetList
(
List
<
Long
>
offsetList
)
{
this
.
offsetList
=
offsetList
;
}
public
static
List
<
Long
>
simpleO
(
List
<
Long
>
offsetList
)
{
List
<
Long
>
simple
=
new
ArrayList
<>();
if
(
offsetList
.
size
()
==
1
)
{
simple
.
addAll
(
offsetList
);
return
simple
;
}
Long
first
=
offsetList
.
get
(
0
);
simple
.
add
(
first
);
for
(
int
i
=
1
;
i
<
offsetList
.
size
();
i
++)
{
simple
.
add
(
offsetList
.
get
(
i
)
-
first
);
}
return
simple
;
}
public
int
getConsumedCount
()
{
return
consumedCount
;
}
public
void
setConsumedCount
(
int
consumedCount
)
{
this
.
consumedCount
=
consumedCount
;
}
public
long
getLastConsumeTimestamp
()
{
return
lastConsumeTimestamp
;
}
public
void
setLastConsumeTimestamp
(
long
lastConsumeTimestamp
)
{
this
.
lastConsumeTimestamp
=
lastConsumeTimestamp
;
}
public
long
getCommitOffsetBit
()
{
return
commitOffsetBit
;
}
public
void
setCommitOffsetBit
(
long
commitOffsetBit
)
{
this
.
commitOffsetBit
=
commitOffsetBit
;
}
@JSONField
(
serialize
=
false
,
deserialize
=
false
)
public
boolean
isDone
()
{
if
(
offsetList
==
null
||
offsetList
.
isEmpty
())
{
return
true
;
}
int
num
=
offsetList
.
size
();
for
(
byte
i
=
0
;
i
<
num
;
i
++)
{
if
((
commitOffsetBit
&
(
1L
<<
i
))
==
0
)
{
return
false
;
}
}
return
true
;
}
@JSONField
(
serialize
=
false
,
deserialize
=
false
)
public
String
encode
()
{
StringBuilder
sb
=
new
StringBuilder
();
sb
.
append
(
"{"
).
append
(
"\"c\":"
).
append
(
getConsumedCount
());
sb
.
append
(
","
).
append
(
"\"cm\":"
).
append
(
getCommitOffsetBit
());
sb
.
append
(
","
).
append
(
"\"l\":"
).
append
(
getLastConsumeTimestamp
());
sb
.
append
(
","
).
append
(
"\"o\":["
);
if
(
getOffsetList
()
!=
null
)
{
for
(
int
i
=
0
;
i
<
getOffsetList
().
size
();
i
++)
{
sb
.
append
(
getOffsetList
().
get
(
i
));
if
(
i
<
getOffsetList
().
size
()
-
1
)
{
sb
.
append
(
","
);
}
}
}
sb
.
append
(
"]"
).
append
(
"}"
);
return
sb
.
toString
();
}
@Override
public
String
toString
()
{
final
StringBuilder
sb
=
new
StringBuilder
(
"OrderInfo"
);
sb
.
append
(
"@"
).
append
(
this
.
hashCode
());
sb
.
append
(
"{offsetList="
).
append
(
offsetList
);
sb
.
append
(
", consumedCount="
).
append
(
consumedCount
);
sb
.
append
(
", lastConsumeTimestamp="
).
append
(
lastConsumeTimestamp
);
sb
.
append
(
", commitOffsetBit="
).
append
(
commitOffsetBit
);
sb
.
append
(
", isDone="
).
append
(
isDone
());
sb
.
append
(
'}'
);
return
sb
.
toString
();
}
}
}
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java
浏览文件 @
ced6b023
...
...
@@ -30,8 +30,6 @@ import org.apache.rocketmq.common.MixAll;
import
org.apache.rocketmq.common.ThreadFactoryImpl
;
import
org.apache.rocketmq.common.UtilAll
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.common.namesrv.RegisterBrokerResult
;
import
org.apache.rocketmq.common.namesrv.TopAddressing
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
...
...
@@ -41,15 +39,20 @@ import org.apache.rocketmq.common.protocol.body.KVTable;
import
org.apache.rocketmq.common.protocol.body.RegisterBrokerBody
;
import
org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper
;
import
org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper
;
import
org.apache.rocketmq.common.protocol.header.namesrv.GetRouteInfoRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.namesrv.QueryDataVersionResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.namesrv.RegisterBrokerResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.namesrv.UnRegisterBrokerRequestHeader
;
import
org.apache.rocketmq.common.protocol.route.TopicRouteData
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.RPCHook
;
import
org.apache.rocketmq.remoting.RemotingClient
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.exception.RemotingConnectException
;
import
org.apache.rocketmq.remoting.exception.RemotingException
;
import
org.apache.rocketmq.remoting.exception.RemotingSendRequestException
;
import
org.apache.rocketmq.remoting.exception.RemotingTimeoutException
;
import
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException
;
...
...
@@ -394,4 +397,39 @@ public class BrokerOuterAPI {
public
void
registerRPCHook
(
RPCHook
rpcHook
)
{
remotingClient
.
registerRPCHook
(
rpcHook
);
}
public
TopicRouteData
getTopicRouteInfoFromNameServer
(
final
String
topic
,
final
long
timeoutMillis
)
throws
RemotingException
,
MQBrokerException
,
InterruptedException
{
return
getTopicRouteInfoFromNameServer
(
topic
,
timeoutMillis
,
true
);
}
public
TopicRouteData
getTopicRouteInfoFromNameServer
(
final
String
topic
,
final
long
timeoutMillis
,
boolean
allowTopicNotExist
)
throws
MQBrokerException
,
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
{
GetRouteInfoRequestHeader
requestHeader
=
new
GetRouteInfoRequestHeader
();
requestHeader
.
setTopic
(
topic
);
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
GET_ROUTEINFO_BY_TOPIC
,
requestHeader
);
RemotingCommand
response
=
this
.
remotingClient
.
invokeSync
(
null
,
request
,
timeoutMillis
);
assert
response
!=
null
;
switch
(
response
.
getCode
())
{
case
ResponseCode
.
TOPIC_NOT_EXIST
:
{
if
(
allowTopicNotExist
)
{
log
.
warn
(
"get Topic [{}] RouteInfoFromNameServer is not exist value"
,
topic
);
}
break
;
}
case
ResponseCode
.
SUCCESS
:
{
byte
[]
body
=
response
.
getBody
();
if
(
body
!=
null
)
{
return
TopicRouteData
.
decode
(
body
,
TopicRouteData
.
class
);
}
}
default
:
break
;
}
throw
new
MQBrokerException
(
response
.
getCode
(),
response
.
getRemark
());
}
}
broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
0 → 100644
浏览文件 @
ced6b023
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.processor
;
import
com.alibaba.fastjson.JSON
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelHandlerContext
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.util.MsgUtil
;
import
org.apache.rocketmq.common.KeyBuilder
;
import
org.apache.rocketmq.common.PopAckConstants
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.help.FAQUrl
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.ExtraInfoUtil
;
import
org.apache.rocketmq.common.utils.DataConverter
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.netty.NettyRequestProcessor
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.store.MessageExtBrokerInner
;
import
org.apache.rocketmq.store.PutMessageResult
;
import
org.apache.rocketmq.store.PutMessageStatus
;
import
org.apache.rocketmq.store.pop.AckMsg
;
public
class
AckMessageProcessor
implements
NettyRequestProcessor
{
private
static
final
InternalLogger
POP_LOGGER
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
ROCKETMQ_POP_LOGGER_NAME
);
private
final
BrokerController
brokerController
;
private
String
reviveTopic
;
private
PopReviveService
[]
popReviveServices
;
public
AckMessageProcessor
(
final
BrokerController
brokerController
)
{
this
.
brokerController
=
brokerController
;
this
.
reviveTopic
=
PopAckConstants
.
REVIVE_TOPIC
+
this
.
brokerController
.
getBrokerConfig
().
getBrokerClusterName
();
this
.
popReviveServices
=
new
PopReviveService
[
this
.
brokerController
.
getBrokerConfig
().
getReviveQueueNum
()];
for
(
int
i
=
0
;
i
<
this
.
brokerController
.
getBrokerConfig
().
getReviveQueueNum
();
i
++)
{
this
.
popReviveServices
[
i
]
=
new
PopReviveService
(
i
,
brokerController
,
reviveTopic
);
}
}
@Override
public
RemotingCommand
processRequest
(
final
ChannelHandlerContext
ctx
,
RemotingCommand
request
)
throws
RemotingCommandException
{
return
this
.
processRequest
(
ctx
.
channel
(),
request
,
true
);
}
@Override
public
boolean
rejectRequest
()
{
return
false
;
}
public
void
startPopReviveService
()
{
for
(
PopReviveService
popReviveService
:
popReviveServices
)
{
popReviveService
.
start
();
}
}
public
void
shutdownPopReviveService
()
{
for
(
PopReviveService
popReviveService
:
popReviveServices
)
{
popReviveService
.
shutdown
();
}
}
private
RemotingCommand
processRequest
(
final
Channel
channel
,
RemotingCommand
request
,
boolean
brokerAllowSuspend
)
throws
RemotingCommandException
{
final
AckMessageRequestHeader
requestHeader
=
(
AckMessageRequestHeader
)
request
.
decodeCommandCustomHeader
(
AckMessageRequestHeader
.
class
);
MessageExtBrokerInner
msgInner
=
new
MessageExtBrokerInner
();
AckMsg
ackMsg
=
new
AckMsg
();
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
ResponseCode
.
SUCCESS
,
null
);
response
.
setOpaque
(
request
.
getOpaque
());
TopicConfig
topicConfig
=
this
.
brokerController
.
getTopicConfigManager
().
selectTopicConfig
(
requestHeader
.
getTopic
());
if
(
null
==
topicConfig
)
{
POP_LOGGER
.
error
(
"The topic {} not exist, consumer: {} "
,
requestHeader
.
getTopic
(),
RemotingHelper
.
parseChannelRemoteAddr
(
channel
));
response
.
setCode
(
ResponseCode
.
TOPIC_NOT_EXIST
);
response
.
setRemark
(
String
.
format
(
"topic[%s] not exist, apply first please! %s"
,
requestHeader
.
getTopic
(),
FAQUrl
.
suggestTodo
(
FAQUrl
.
APPLY_TOPIC_URL
)));
return
response
;
}
if
(
requestHeader
.
getQueueId
()
>=
topicConfig
.
getReadQueueNums
()
||
requestHeader
.
getQueueId
()
<
0
)
{
String
errorInfo
=
String
.
format
(
"queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]"
,
requestHeader
.
getQueueId
(),
requestHeader
.
getTopic
(),
topicConfig
.
getReadQueueNums
(),
channel
.
remoteAddress
());
POP_LOGGER
.
warn
(
errorInfo
);
response
.
setCode
(
ResponseCode
.
MESSAGE_ILLEGAL
);
response
.
setRemark
(
errorInfo
);
return
response
;
}
long
minOffset
=
this
.
brokerController
.
getMessageStore
().
getMinOffsetInQueue
(
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
long
maxOffset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQueue
(
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
if
(
requestHeader
.
getOffset
()
<
minOffset
||
requestHeader
.
getOffset
()
>
maxOffset
)
{
response
.
setCode
(
ResponseCode
.
NO_MESSAGE
);
return
response
;
}
String
[]
extraInfo
=
ExtraInfoUtil
.
split
(
requestHeader
.
getExtraInfo
());
ackMsg
.
setAckOffset
(
requestHeader
.
getOffset
());
ackMsg
.
setStartOffset
(
ExtraInfoUtil
.
getCkQueueOffset
(
extraInfo
));
ackMsg
.
setConsumerGroup
(
requestHeader
.
getConsumerGroup
());
ackMsg
.
setTopic
(
requestHeader
.
getTopic
());
ackMsg
.
setQueueId
(
requestHeader
.
getQueueId
());
ackMsg
.
setPopTime
(
ExtraInfoUtil
.
getPopTime
(
extraInfo
));
int
rqId
=
ExtraInfoUtil
.
getReviveQid
(
extraInfo
);
if
(
rqId
==
KeyBuilder
.
POP_ORDER_REVIVE_QUEUE
)
{
// order
String
lockKey
=
requestHeader
.
getTopic
()
+
PopAckConstants
.
SPLIT
+
requestHeader
.
getConsumerGroup
()
+
PopAckConstants
.
SPLIT
+
requestHeader
.
getQueueId
();
long
oldOffset
=
this
.
brokerController
.
getConsumerOffsetManager
().
queryOffset
(
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
if
(
requestHeader
.
getOffset
()
<
oldOffset
)
{
return
response
;
}
while
(!
this
.
brokerController
.
getPopMessageProcessor
().
getQueueLockManager
().
tryLock
(
lockKey
))
{
}
try
{
oldOffset
=
this
.
brokerController
.
getConsumerOffsetManager
().
queryOffset
(
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
if
(
requestHeader
.
getOffset
()
<
oldOffset
)
{
return
response
;
}
long
nextOffset
=
brokerController
.
getConsumerOrderInfoManager
().
commitAndNext
(
requestHeader
.
getTopic
(),
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getQueueId
(),
requestHeader
.
getOffset
());
if
(
nextOffset
>
-
1
)
{
this
.
brokerController
.
getConsumerOffsetManager
().
commitOffset
(
channel
.
remoteAddress
().
toString
(),
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
(),
nextOffset
);
this
.
brokerController
.
getPopMessageProcessor
().
notifyMessageArriving
(
requestHeader
.
getTopic
(),
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getQueueId
());
}
else
if
(
nextOffset
==
-
1
)
{
String
errorInfo
=
String
.
format
(
"offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s"
,
lockKey
,
oldOffset
,
requestHeader
.
getOffset
(),
nextOffset
,
channel
.
remoteAddress
());
POP_LOGGER
.
warn
(
errorInfo
);
response
.
setCode
(
ResponseCode
.
MESSAGE_ILLEGAL
);
response
.
setRemark
(
errorInfo
);
return
response
;
}
}
finally
{
this
.
brokerController
.
getPopMessageProcessor
().
getQueueLockManager
().
unLock
(
lockKey
);
}
return
response
;
}
if
(
this
.
brokerController
.
getPopMessageProcessor
().
getPopBufferMergeService
().
addAk
(
rqId
,
ackMsg
))
{
return
response
;
}
msgInner
.
setTopic
(
reviveTopic
);
msgInner
.
setBody
(
JSON
.
toJSONString
(
ackMsg
).
getBytes
(
DataConverter
.
charset
));
//msgInner.setQueueId(Integer.valueOf(extraInfo[3]));
msgInner
.
setQueueId
(
rqId
);
msgInner
.
setTags
(
PopAckConstants
.
ACK_TAG
);
msgInner
.
setBornTimestamp
(
System
.
currentTimeMillis
());
msgInner
.
setBornHost
(
this
.
brokerController
.
getStoreHost
());
msgInner
.
setStoreHost
(
this
.
brokerController
.
getStoreHost
());
MsgUtil
.
setMessageDeliverTime
(
this
.
brokerController
,
msgInner
,
ExtraInfoUtil
.
getPopTime
(
extraInfo
)
+
ExtraInfoUtil
.
getInvisibleTime
(
extraInfo
));
msgInner
.
getProperties
().
put
(
MessageConst
.
PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX
,
PopMessageProcessor
.
genAckUniqueId
(
ackMsg
));
msgInner
.
setPropertiesString
(
MessageDecoder
.
messageProperties2String
(
msgInner
.
getProperties
()));
PutMessageResult
putMessageResult
=
this
.
brokerController
.
getMessageStore
().
putMessage
(
msgInner
);
if
(
putMessageResult
.
getPutMessageStatus
()
!=
PutMessageStatus
.
PUT_OK
&&
putMessageResult
.
getPutMessageStatus
()
!=
PutMessageStatus
.
FLUSH_DISK_TIMEOUT
&&
putMessageResult
.
getPutMessageStatus
()
!=
PutMessageStatus
.
FLUSH_SLAVE_TIMEOUT
&&
putMessageResult
.
getPutMessageStatus
()
!=
PutMessageStatus
.
SLAVE_NOT_AVAILABLE
)
{
POP_LOGGER
.
error
(
"put ack msg error:"
+
putMessageResult
);
}
return
response
;
}
}
broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
0 → 100644
浏览文件 @
ced6b023
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.processor
;
import
com.alibaba.fastjson.JSON
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelHandlerContext
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.util.MsgUtil
;
import
org.apache.rocketmq.common.PopAckConstants
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.help.FAQUrl
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.ExtraInfoUtil
;
import
org.apache.rocketmq.common.utils.DataConverter
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.netty.NettyRequestProcessor
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.store.MessageExtBrokerInner
;
import
org.apache.rocketmq.store.PutMessageResult
;
import
org.apache.rocketmq.store.PutMessageStatus
;
import
org.apache.rocketmq.store.pop.AckMsg
;
import
org.apache.rocketmq.store.pop.PopCheckPoint
;
public
class
ChangeInvisibleTimeProcessor
implements
NettyRequestProcessor
{
private
static
final
InternalLogger
POP_LOGGER
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
ROCKETMQ_POP_LOGGER_NAME
);
private
final
BrokerController
brokerController
;
private
String
reviveTopic
;
public
ChangeInvisibleTimeProcessor
(
final
BrokerController
brokerController
)
{
this
.
brokerController
=
brokerController
;
this
.
reviveTopic
=
PopAckConstants
.
REVIVE_TOPIC
+
this
.
brokerController
.
getBrokerConfig
().
getBrokerClusterName
();
}
@Override
public
RemotingCommand
processRequest
(
final
ChannelHandlerContext
ctx
,
RemotingCommand
request
)
throws
RemotingCommandException
{
return
this
.
processRequest
(
ctx
.
channel
(),
request
,
true
);
}
@Override
public
boolean
rejectRequest
()
{
return
false
;
}
private
RemotingCommand
processRequest
(
final
Channel
channel
,
RemotingCommand
request
,
boolean
brokerAllowSuspend
)
throws
RemotingCommandException
{
final
ChangeInvisibleTimeRequestHeader
requestHeader
=
(
ChangeInvisibleTimeRequestHeader
)
request
.
decodeCommandCustomHeader
(
ChangeInvisibleTimeRequestHeader
.
class
);
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
ChangeInvisibleTimeResponseHeader
.
class
);
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setOpaque
(
request
.
getOpaque
());
final
ChangeInvisibleTimeResponseHeader
responseHeader
=
(
ChangeInvisibleTimeResponseHeader
)
response
.
readCustomHeader
();
TopicConfig
topicConfig
=
this
.
brokerController
.
getTopicConfigManager
().
selectTopicConfig
(
requestHeader
.
getTopic
());
if
(
null
==
topicConfig
)
{
POP_LOGGER
.
error
(
"The topic {} not exist, consumer: {} "
,
requestHeader
.
getTopic
(),
RemotingHelper
.
parseChannelRemoteAddr
(
channel
));
response
.
setCode
(
ResponseCode
.
TOPIC_NOT_EXIST
);
response
.
setRemark
(
String
.
format
(
"topic[%s] not exist, apply first please! %s"
,
requestHeader
.
getTopic
(),
FAQUrl
.
suggestTodo
(
FAQUrl
.
APPLY_TOPIC_URL
)));
return
response
;
}
if
(
requestHeader
.
getQueueId
()
>=
topicConfig
.
getReadQueueNums
()
||
requestHeader
.
getQueueId
()
<
0
)
{
String
errorInfo
=
String
.
format
(
"queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]"
,
requestHeader
.
getQueueId
(),
requestHeader
.
getTopic
(),
topicConfig
.
getReadQueueNums
(),
channel
.
remoteAddress
());
POP_LOGGER
.
warn
(
errorInfo
);
response
.
setCode
(
ResponseCode
.
MESSAGE_ILLEGAL
);
response
.
setRemark
(
errorInfo
);
return
response
;
}
long
minOffset
=
this
.
brokerController
.
getMessageStore
().
getMinOffsetInQueue
(
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
long
maxOffset
=
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQueue
(
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
if
(
requestHeader
.
getOffset
()
<
minOffset
||
requestHeader
.
getOffset
()
>
maxOffset
)
{
response
.
setCode
(
ResponseCode
.
NO_MESSAGE
);
return
response
;
}
String
[]
extraInfo
=
ExtraInfoUtil
.
split
(
requestHeader
.
getExtraInfo
());
// add new ck
long
now
=
System
.
currentTimeMillis
();
PutMessageResult
ckResult
=
appendCheckPoint
(
requestHeader
,
ExtraInfoUtil
.
getReviveQid
(
extraInfo
),
requestHeader
.
getQueueId
(),
requestHeader
.
getOffset
(),
now
);
if
(
ckResult
.
getPutMessageStatus
()
!=
PutMessageStatus
.
PUT_OK
&&
ckResult
.
getPutMessageStatus
()
!=
PutMessageStatus
.
FLUSH_DISK_TIMEOUT
&&
ckResult
.
getPutMessageStatus
()
!=
PutMessageStatus
.
FLUSH_SLAVE_TIMEOUT
&&
ckResult
.
getPutMessageStatus
()
!=
PutMessageStatus
.
SLAVE_NOT_AVAILABLE
)
{
POP_LOGGER
.
error
(
"change Invisible, put new ck error: {}"
,
ckResult
);
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
return
response
;
}
// ack old msg.
try
{
ackOrigin
(
requestHeader
,
extraInfo
);
}
catch
(
Throwable
e
)
{
POP_LOGGER
.
error
(
"change Invisible, put ack msg error: {}, {}"
,
requestHeader
.
getExtraInfo
(),
e
.
getMessage
());
// cancel new ck?
}
responseHeader
.
setInvisibleTime
(
requestHeader
.
getInvisibleTime
());
responseHeader
.
setPopTime
(
now
);
responseHeader
.
setReviveQid
(
ExtraInfoUtil
.
getReviveQid
(
extraInfo
));
return
response
;
}
private
void
ackOrigin
(
final
ChangeInvisibleTimeRequestHeader
requestHeader
,
String
[]
extraInfo
)
{
MessageExtBrokerInner
msgInner
=
new
MessageExtBrokerInner
();
AckMsg
ackMsg
=
new
AckMsg
();
ackMsg
.
setAckOffset
(
requestHeader
.
getOffset
());
ackMsg
.
setStartOffset
(
ExtraInfoUtil
.
getCkQueueOffset
(
extraInfo
));
ackMsg
.
setConsumerGroup
(
requestHeader
.
getConsumerGroup
());
ackMsg
.
setTopic
(
requestHeader
.
getTopic
());
ackMsg
.
setQueueId
(
requestHeader
.
getQueueId
());
ackMsg
.
setPopTime
(
ExtraInfoUtil
.
getPopTime
(
extraInfo
));
int
rqId
=
ExtraInfoUtil
.
getReviveQid
(
extraInfo
);
if
(
brokerController
.
getPopMessageProcessor
().
getPopBufferMergeService
().
addAk
(
rqId
,
ackMsg
))
{
return
;
}
msgInner
.
setTopic
(
reviveTopic
);
msgInner
.
setBody
(
JSON
.
toJSONString
(
ackMsg
).
getBytes
(
DataConverter
.
charset
));
msgInner
.
setQueueId
(
rqId
);
msgInner
.
setTags
(
PopAckConstants
.
ACK_TAG
);
msgInner
.
setBornTimestamp
(
System
.
currentTimeMillis
());
msgInner
.
setBornHost
(
this
.
brokerController
.
getStoreHost
());
msgInner
.
setStoreHost
(
this
.
brokerController
.
getStoreHost
());
MsgUtil
.
setMessageDeliverTime
(
this
.
brokerController
,
msgInner
,
ExtraInfoUtil
.
getPopTime
(
extraInfo
)
+
ExtraInfoUtil
.
getInvisibleTime
(
extraInfo
));
msgInner
.
getProperties
().
put
(
MessageConst
.
PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX
,
PopMessageProcessor
.
genAckUniqueId
(
ackMsg
));
msgInner
.
setPropertiesString
(
MessageDecoder
.
messageProperties2String
(
msgInner
.
getProperties
()));
PutMessageResult
putMessageResult
=
this
.
brokerController
.
getMessageStore
().
putMessage
(
msgInner
);
if
(
putMessageResult
.
getPutMessageStatus
()
!=
PutMessageStatus
.
PUT_OK
&&
putMessageResult
.
getPutMessageStatus
()
!=
PutMessageStatus
.
FLUSH_DISK_TIMEOUT
&&
putMessageResult
.
getPutMessageStatus
()
!=
PutMessageStatus
.
FLUSH_SLAVE_TIMEOUT
&&
putMessageResult
.
getPutMessageStatus
()
!=
PutMessageStatus
.
SLAVE_NOT_AVAILABLE
)
{
POP_LOGGER
.
error
(
"change Invisible, put ack msg fail: {}, {}"
,
ackMsg
,
putMessageResult
);
}
}
private
PutMessageResult
appendCheckPoint
(
final
ChangeInvisibleTimeRequestHeader
requestHeader
,
int
reviveQid
,
int
queueId
,
long
offset
,
long
popTime
)
{
// add check point msg to revive log
MessageExtBrokerInner
msgInner
=
new
MessageExtBrokerInner
();
msgInner
.
setTopic
(
reviveTopic
);
PopCheckPoint
ck
=
new
PopCheckPoint
();
ck
.
setBitMap
(
0
);
ck
.
setNum
((
byte
)
1
);
ck
.
setPopTime
(
popTime
);
ck
.
setInvisibleTime
(
requestHeader
.
getInvisibleTime
());
ck
.
getStartOffset
(
offset
);
ck
.
setCId
(
requestHeader
.
getConsumerGroup
());
ck
.
setTopic
(
requestHeader
.
getTopic
());
ck
.
setQueueId
((
byte
)
queueId
);
ck
.
addDiff
(
0
);
msgInner
.
setBody
(
JSON
.
toJSONString
(
ck
).
getBytes
(
DataConverter
.
charset
));
msgInner
.
setQueueId
(
reviveQid
);
msgInner
.
setTags
(
PopAckConstants
.
CK_TAG
);
msgInner
.
setBornTimestamp
(
System
.
currentTimeMillis
());
msgInner
.
setBornHost
(
this
.
brokerController
.
getStoreHost
());
msgInner
.
setStoreHost
(
this
.
brokerController
.
getStoreHost
());
MsgUtil
.
setMessageDeliverTime
(
this
.
brokerController
,
msgInner
,
ck
.
getReviveTime
()
-
PopAckConstants
.
ackTimeInterval
);
msgInner
.
getProperties
().
put
(
MessageConst
.
PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX
,
PopMessageProcessor
.
genCkUniqueId
(
ck
));
msgInner
.
setPropertiesString
(
MessageDecoder
.
messageProperties2String
(
msgInner
.
getProperties
()));
PutMessageResult
putMessageResult
=
this
.
brokerController
.
getMessageStore
().
putMessage
(
msgInner
);
if
(
brokerController
.
getBrokerConfig
().
isEnablePopLog
())
{
POP_LOGGER
.
info
(
"change Invisible , appendCheckPoint, topic {}, queueId {},reviveId {}, cid {}, startOffset {}, rt {}, result {}"
,
requestHeader
.
getTopic
(),
queueId
,
reviveQid
,
requestHeader
.
getConsumerGroup
(),
offset
,
ck
.
getReviveTime
(),
putMessageResult
);
}
return
putMessageResult
;
}
}
broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
0 → 100644
浏览文件 @
ced6b023
此差异已折叠。
点击以展开。
broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
0 → 100644
浏览文件 @
ced6b023
此差异已折叠。
点击以展开。
broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
0 → 100644
浏览文件 @
ced6b023
此差异已折叠。
点击以展开。
broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java
0 → 100644
浏览文件 @
ced6b023
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.processor
;
import
io.netty.channel.ChannelHandlerContext
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.HashSet
;
import
java.util.List
;
import
java.util.Set
;
import
java.util.concurrent.ConcurrentHashMap
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.client.ConsumerGroupInfo
;
import
org.apache.rocketmq.broker.loadbalance.AssignmentManager
;
import
org.apache.rocketmq.broker.loadbalance.MessageRequestModeManager
;
import
org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy
;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely
;
import
org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragelyByCircle
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.message.MessageQueueAssignment
;
import
org.apache.rocketmq.common.message.MessageRequestMode
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.body.QueryAssignmentRequestBody
;
import
org.apache.rocketmq.common.protocol.body.QueryAssignmentResponseBody
;
import
org.apache.rocketmq.common.protocol.body.SetMessageRequestModeRequestBody
;
import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.netty.NettyRequestProcessor
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
public
class
QueryAssignmentProcessor
implements
NettyRequestProcessor
{
private
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
private
final
BrokerController
brokerController
;
private
final
ConcurrentHashMap
<
String
,
AllocateMessageQueueStrategy
>
name2LoadStrategy
=
new
ConcurrentHashMap
<
String
,
AllocateMessageQueueStrategy
>();
private
MessageRequestModeManager
messageRequestModeManager
;
public
QueryAssignmentProcessor
(
final
BrokerController
brokerController
)
{
this
.
brokerController
=
brokerController
;
//register strategy
//NOTE: init with broker's log instead of init with ClientLogger.getLog();
AllocateMessageQueueAveragely
allocateMessageQueueAveragely
=
new
AllocateMessageQueueAveragely
(
log
);
name2LoadStrategy
.
put
(
allocateMessageQueueAveragely
.
getName
(),
allocateMessageQueueAveragely
);
AllocateMessageQueueAveragelyByCircle
allocateMessageQueueAveragelyByCircle
=
new
AllocateMessageQueueAveragelyByCircle
(
log
);
name2LoadStrategy
.
put
(
allocateMessageQueueAveragelyByCircle
.
getName
(),
allocateMessageQueueAveragelyByCircle
);
this
.
messageRequestModeManager
=
new
MessageRequestModeManager
(
brokerController
);
this
.
messageRequestModeManager
.
load
();
}
@Override
public
RemotingCommand
processRequest
(
ChannelHandlerContext
ctx
,
RemotingCommand
request
)
throws
RemotingCommandException
{
switch
(
request
.
getCode
())
{
case
RequestCode
.
QUERY_ASSIGNMENT
:
return
this
.
queryAssignment
(
ctx
,
request
);
case
RequestCode
.
SET_MESSAGE_REQUEST_MODE
:
return
this
.
setMessageRequestMode
(
ctx
,
request
);
default
:
break
;
}
return
null
;
}
@Override
public
boolean
rejectRequest
()
{
return
false
;
}
/**
*
*/
private
RemotingCommand
queryAssignment
(
ChannelHandlerContext
ctx
,
RemotingCommand
request
)
throws
RemotingCommandException
{
final
QueryAssignmentRequestBody
requestBody
=
QueryAssignmentRequestBody
.
decode
(
request
.
getBody
(),
QueryAssignmentRequestBody
.
class
);
final
String
topic
=
requestBody
.
getTopic
();
final
String
consumerGroup
=
requestBody
.
getConsumerGroup
();
final
String
clientId
=
requestBody
.
getClientId
();
final
MessageModel
messageModel
=
requestBody
.
getMessageModel
();
final
String
strategyName
=
requestBody
.
getStrategyName
();
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
null
);
final
QueryAssignmentResponseBody
responseBody
=
new
QueryAssignmentResponseBody
();
SetMessageRequestModeRequestBody
setMessageRequestModeRequestBody
=
this
.
messageRequestModeManager
.
getMessageRequestMode
(
topic
,
consumerGroup
);
if
(
setMessageRequestModeRequestBody
==
null
)
{
setMessageRequestModeRequestBody
=
new
SetMessageRequestModeRequestBody
();
setMessageRequestModeRequestBody
.
setTopic
(
topic
);
setMessageRequestModeRequestBody
.
setConsumerGroup
(
consumerGroup
);
if
(
topic
.
startsWith
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
))
{
// retry topic must be pull mode
setMessageRequestModeRequestBody
.
setMode
(
MessageRequestMode
.
PULL
);
}
else
{
setMessageRequestModeRequestBody
.
setMode
(
brokerController
.
getBrokerConfig
().
getDefaultMessageRequestMode
());
}
if
(
setMessageRequestModeRequestBody
.
getMode
()
==
MessageRequestMode
.
POP
)
{
setMessageRequestModeRequestBody
.
setPopShareQueueNum
(
brokerController
.
getBrokerConfig
().
getDefaultPopShareQueueNum
());
}
}
Set
<
MessageQueue
>
messageQueues
=
doLoadBalance
(
topic
,
consumerGroup
,
clientId
,
messageModel
,
strategyName
,
setMessageRequestModeRequestBody
,
ctx
);
Set
<
MessageQueueAssignment
>
assignments
=
null
;
if
(
messageQueues
!=
null
)
{
assignments
=
new
HashSet
<
MessageQueueAssignment
>();
for
(
MessageQueue
messageQueue
:
messageQueues
)
{
MessageQueueAssignment
messageQueueAssignment
=
new
MessageQueueAssignment
();
messageQueueAssignment
.
setMessageQueue
(
messageQueue
);
if
(
setMessageRequestModeRequestBody
!=
null
)
{
messageQueueAssignment
.
setMode
(
setMessageRequestModeRequestBody
.
getMode
());
}
assignments
.
add
(
messageQueueAssignment
);
}
}
responseBody
.
setMessageQueueAssignments
(
assignments
);
response
.
setBody
(
responseBody
.
encode
());
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setRemark
(
null
);
return
response
;
}
/**
* Returns empty set means the client should clear all load assigned to it before, null means invalid result and the
* client should skip the update logic
*
* @param topic
* @param consumerGroup
* @param clientId
* @param messageModel
* @param strategyName
* @return the MessageQueues assigned to this client
*/
private
Set
<
MessageQueue
>
doLoadBalance
(
final
String
topic
,
final
String
consumerGroup
,
final
String
clientId
,
final
MessageModel
messageModel
,
final
String
strategyName
,
SetMessageRequestModeRequestBody
setMessageRequestModeRequestBody
,
final
ChannelHandlerContext
ctx
)
{
Set
<
MessageQueue
>
assignedQueueSet
=
null
;
AssignmentManager
assignmentManager
=
brokerController
.
getAssignmentManager
();
switch
(
messageModel
)
{
case
BROADCASTING:
{
assignedQueueSet
=
assignmentManager
.
getTopicSubscribeInfo
(
topic
);
if
(
assignedQueueSet
==
null
)
{
log
.
warn
(
"QueryLoad: no assignment for group[{}], the topic[{}] does not exist."
,
consumerGroup
,
topic
);
}
break
;
}
case
CLUSTERING:
{
Set
<
MessageQueue
>
mqSet
=
assignmentManager
.
getTopicSubscribeInfo
(
topic
);
if
(
null
==
mqSet
)
{
if
(!
topic
.
startsWith
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
))
{
log
.
warn
(
"QueryLoad: no assignment for group[{}], the topic[{}] does not exist."
,
consumerGroup
,
topic
);
}
return
null
;
}
if
(!
brokerController
.
getBrokerConfig
().
isServerLoadBalancerEnabled
())
{
return
mqSet
;
}
List
<
String
>
cidAll
=
null
;
ConsumerGroupInfo
consumerGroupInfo
=
this
.
brokerController
.
getConsumerManager
().
getConsumerGroupInfo
(
consumerGroup
);
if
(
consumerGroupInfo
!=
null
)
{
cidAll
=
consumerGroupInfo
.
getAllClientId
();
}
if
(
null
==
cidAll
)
{
log
.
warn
(
"QueryLoad: no assignment for group[{}] topic[{}], get consumer id list failed"
,
consumerGroup
,
topic
);
return
null
;
}
List
<
MessageQueue
>
mqAll
=
new
ArrayList
<
MessageQueue
>();
mqAll
.
addAll
(
mqSet
);
Collections
.
sort
(
mqAll
);
Collections
.
sort
(
cidAll
);
List
<
MessageQueue
>
allocateResult
=
null
;
try
{
AllocateMessageQueueStrategy
allocateMessageQueueStrategy
=
name2LoadStrategy
.
get
(
strategyName
);
if
(
null
==
allocateMessageQueueStrategy
)
{
log
.
warn
(
"QueryLoad: unsupported strategy [{}], {}"
,
consumerGroup
,
RemotingHelper
.
parseChannelRemoteAddr
(
ctx
.
channel
()));
return
null
;
}
if
(
setMessageRequestModeRequestBody
!=
null
&&
setMessageRequestModeRequestBody
.
getMode
()
==
MessageRequestMode
.
POP
)
{
if
(
setMessageRequestModeRequestBody
.
getPopShareQueueNum
()
<=
0
)
{
//each client pop all messagequeue
allocateResult
=
new
ArrayList
<>(
mqAll
.
size
());
for
(
MessageQueue
mq
:
mqAll
)
{
//must create new MessageQueue in case of change cache in AssignmentManager
MessageQueue
newMq
=
new
MessageQueue
(
mq
.
getTopic
(),
mq
.
getBrokerName
(),
-
1
);
allocateResult
.
add
(
newMq
);
}
}
else
{
if
(
cidAll
.
size
()
<=
mqAll
.
size
())
{
//consumer working in pop mode could share the MessageQueues assigned to the N (N = popWorkGroupSize) consumer following it in the cid list
allocateResult
=
allocateMessageQueueStrategy
.
allocate
(
consumerGroup
,
clientId
,
mqAll
,
cidAll
);
int
index
=
cidAll
.
indexOf
(
clientId
);
if
(
index
>=
0
)
{
for
(
int
i
=
1
;
i
<=
setMessageRequestModeRequestBody
.
getPopShareQueueNum
();
i
++)
{
index
++;
index
=
index
%
cidAll
.
size
();
List
<
MessageQueue
>
tmp
=
allocateMessageQueueStrategy
.
allocate
(
consumerGroup
,
cidAll
.
get
(
index
),
mqAll
,
cidAll
);
allocateResult
.
addAll
(
tmp
);
}
}
}
else
{
//make sure each cid is assigned
allocateResult
=
allocate
(
consumerGroup
,
clientId
,
mqAll
,
cidAll
);
}
}
}
else
{
allocateResult
=
allocateMessageQueueStrategy
.
allocate
(
consumerGroup
,
clientId
,
mqAll
,
cidAll
);
}
}
catch
(
Throwable
e
)
{
log
.
error
(
"QueryLoad: no assignment for group[{}] topic[{}], allocate message queue exception. strategy name: {}, ex: {}"
,
consumerGroup
,
topic
,
strategyName
,
e
);
return
null
;
}
assignedQueueSet
=
new
HashSet
<
MessageQueue
>();
if
(
allocateResult
!=
null
)
{
assignedQueueSet
.
addAll
(
allocateResult
);
}
break
;
}
default
:
break
;
}
return
assignedQueueSet
;
}
private
List
<
MessageQueue
>
allocate
(
String
consumerGroup
,
String
currentCID
,
List
<
MessageQueue
>
mqAll
,
List
<
String
>
cidAll
)
{
if
(
currentCID
==
null
||
currentCID
.
length
()
<
1
)
{
throw
new
IllegalArgumentException
(
"currentCID is empty"
);
}
if
(
mqAll
==
null
||
mqAll
.
isEmpty
())
{
throw
new
IllegalArgumentException
(
"mqAll is null or mqAll empty"
);
}
if
(
cidAll
==
null
||
cidAll
.
isEmpty
())
{
throw
new
IllegalArgumentException
(
"cidAll is null or cidAll empty"
);
}
List
<
MessageQueue
>
result
=
new
ArrayList
<
MessageQueue
>();
if
(!
cidAll
.
contains
(
currentCID
))
{
log
.
info
(
"[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}"
,
consumerGroup
,
currentCID
,
cidAll
);
return
result
;
}
int
index
=
cidAll
.
indexOf
(
currentCID
);
result
.
add
(
mqAll
.
get
(
index
%
mqAll
.
size
()));
return
result
;
}
private
RemotingCommand
setMessageRequestMode
(
ChannelHandlerContext
ctx
,
RemotingCommand
request
)
throws
RemotingCommandException
{
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
null
);
final
SetMessageRequestModeRequestBody
requestBody
=
SetMessageRequestModeRequestBody
.
decode
(
request
.
getBody
(),
SetMessageRequestModeRequestBody
.
class
);
final
String
topic
=
requestBody
.
getTopic
();
if
(
topic
.
startsWith
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
))
{
response
.
setCode
(
ResponseCode
.
NO_PERMISSION
);
response
.
setRemark
(
"retry topic is not allowed to set mode"
);
return
response
;
}
final
String
consumerGroup
=
requestBody
.
getConsumerGroup
();
this
.
messageRequestModeManager
.
setMessageRequestMode
(
topic
,
consumerGroup
,
requestBody
);
this
.
messageRequestModeManager
.
persist
();
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setRemark
(
null
);
return
response
;
}
}
broker/src/main/java/org/apache/rocketmq/broker/util/MsgUtil.java
0 → 100644
浏览文件 @
ced6b023
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.util
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.common.message.MessageExt
;
public
final
class
MsgUtil
{
private
MsgUtil
()
{
}
public
static
void
setMessageDeliverTime
(
BrokerController
brokerController
,
Message
msgInner
,
long
timeMillis
)
{
msgInner
.
setDelayTimeLevel
(
brokerController
.
getMessageStore
().
getScheduleMessageService
().
computeDelayLevel
(
timeMillis
));
}
public
static
long
getMessageDeliverTime
(
BrokerController
brokerController
,
MessageExt
msgInner
)
{
return
brokerController
.
getMessageStore
().
getScheduleMessageService
().
computeDeliverTimestamp
(
msgInner
.
getDelayTimeLevel
(),
msgInner
.
getStoreTimestamp
());
}
}
broker/src/test/java/org/apache/rocketmq/broker/processor/AckMessageProcessorTest.java
0 → 100644
浏览文件 @
ced6b023
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.processor
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelHandlerContext
;
import
java.lang.reflect.Field
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.client.ClientChannelInfo
;
import
org.apache.rocketmq.broker.client.net.Broker2Client
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.ExtraInfoUtil
;
import
org.apache.rocketmq.common.protocol.heartbeat.ConsumerData
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.exception.RemotingSendRequestException
;
import
org.apache.rocketmq.remoting.exception.RemotingTimeoutException
;
import
org.apache.rocketmq.remoting.netty.NettyClientConfig
;
import
org.apache.rocketmq.remoting.netty.NettyServerConfig
;
import
org.apache.rocketmq.remoting.protocol.LanguageCode
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.store.AppendMessageResult
;
import
org.apache.rocketmq.store.AppendMessageStatus
;
import
org.apache.rocketmq.store.DefaultMessageStore
;
import
org.apache.rocketmq.store.MessageExtBrokerInner
;
import
org.apache.rocketmq.store.PutMessageResult
;
import
org.apache.rocketmq.store.PutMessageStatus
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.apache.rocketmq.store.schedule.ScheduleMessageService
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.mockito.Spy
;
import
org.mockito.junit.MockitoJUnitRunner
;
import
static
org
.
apache
.
rocketmq
.
broker
.
processor
.
PullMessageProcessorTest
.
createConsumerData
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
when
;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
AckMessageProcessorTest
{
private
AckMessageProcessor
ackMessageProcessor
;
@Spy
private
BrokerController
brokerController
=
new
BrokerController
(
new
BrokerConfig
(),
new
NettyServerConfig
(),
new
NettyClientConfig
(),
new
MessageStoreConfig
());
@Mock
private
ChannelHandlerContext
handlerContext
;
@Mock
private
DefaultMessageStore
messageStore
;
@Mock
private
Channel
channel
;
private
String
topic
=
"FooBar"
;
private
String
group
=
"FooBarGroup"
;
private
ClientChannelInfo
clientInfo
;
@Mock
private
Broker2Client
broker2Client
;
@Before
public
void
init
()
throws
IllegalAccessException
,
NoSuchFieldException
{
clientInfo
=
new
ClientChannelInfo
(
channel
,
"127.0.0.1"
,
LanguageCode
.
JAVA
,
0
);
brokerController
.
setMessageStore
(
messageStore
);
Field
field
=
BrokerController
.
class
.
getDeclaredField
(
"broker2Client"
);
field
.
setAccessible
(
true
);
field
.
set
(
brokerController
,
broker2Client
);
ScheduleMessageService
scheduleMessageService
=
new
ScheduleMessageService
(
messageStore
);
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMessageDelayLevel
(
"5s 10s"
);
when
(
messageStore
.
getMessageStoreConfig
()).
thenReturn
(
messageStoreConfig
);
scheduleMessageService
.
parseDelayLevel
();
when
(
messageStore
.
getScheduleMessageService
()).
thenReturn
(
scheduleMessageService
);
Channel
mockChannel
=
mock
(
Channel
.
class
);
when
(
handlerContext
.
channel
()).
thenReturn
(
mockChannel
);
brokerController
.
getTopicConfigManager
().
getTopicConfigTable
().
put
(
topic
,
new
TopicConfig
());
ConsumerData
consumerData
=
createConsumerData
(
group
,
topic
);
brokerController
.
getConsumerManager
().
registerConsumer
(
consumerData
.
getGroupName
(),
clientInfo
,
consumerData
.
getConsumeType
(),
consumerData
.
getMessageModel
(),
consumerData
.
getConsumeFromWhere
(),
consumerData
.
getSubscriptionDataSet
(),
false
);
ackMessageProcessor
=
new
AckMessageProcessor
(
brokerController
);
}
@Test
public
void
testProcessRequest_Success
()
throws
RemotingCommandException
,
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
{
when
(
messageStore
.
putMessage
(
any
(
MessageExtBrokerInner
.
class
))).
thenReturn
(
new
PutMessageResult
(
PutMessageStatus
.
PUT_OK
,
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
)));
int
queueId
=
0
;
long
queueOffset
=
0
;
long
popTime
=
System
.
currentTimeMillis
()
-
1_000
;
long
invisibleTime
=
30_000
;
int
reviveQid
=
0
;
String
brokerName
=
"test_broker"
;
String
extraInfo
=
ExtraInfoUtil
.
buildExtraInfo
(
queueOffset
,
popTime
,
invisibleTime
,
reviveQid
,
topic
,
brokerName
,
queueId
)
+
MessageConst
.
KEY_SEPARATOR
+
queueOffset
;
AckMessageRequestHeader
requestHeader
=
new
AckMessageRequestHeader
();
requestHeader
.
setTopic
(
topic
);
requestHeader
.
setQueueId
(
0
);
requestHeader
.
setOffset
(
0L
);
requestHeader
.
setConsumerGroup
(
group
);
requestHeader
.
setExtraInfo
(
extraInfo
);
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
ACK_MESSAGE
,
requestHeader
);
request
.
makeCustomHeaderToNet
();
RemotingCommand
responseToReturn
=
ackMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
responseToReturn
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUCCESS
);
assertThat
(
responseToReturn
.
getOpaque
()).
isEqualTo
(
request
.
getOpaque
());
}
}
\ No newline at end of file
broker/src/test/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessorTest.java
0 → 100644
浏览文件 @
ced6b023
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.processor
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelHandlerContext
;
import
java.lang.reflect.Field
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.client.ClientChannelInfo
;
import
org.apache.rocketmq.broker.client.net.Broker2Client
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.ExtraInfoUtil
;
import
org.apache.rocketmq.common.protocol.heartbeat.ConsumerData
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.exception.RemotingSendRequestException
;
import
org.apache.rocketmq.remoting.exception.RemotingTimeoutException
;
import
org.apache.rocketmq.remoting.netty.NettyClientConfig
;
import
org.apache.rocketmq.remoting.netty.NettyServerConfig
;
import
org.apache.rocketmq.remoting.protocol.LanguageCode
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.store.AppendMessageResult
;
import
org.apache.rocketmq.store.AppendMessageStatus
;
import
org.apache.rocketmq.store.DefaultMessageStore
;
import
org.apache.rocketmq.store.MessageExtBrokerInner
;
import
org.apache.rocketmq.store.PutMessageResult
;
import
org.apache.rocketmq.store.PutMessageStatus
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.apache.rocketmq.store.schedule.ScheduleMessageService
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.mockito.Spy
;
import
org.mockito.junit.MockitoJUnitRunner
;
import
static
org
.
apache
.
rocketmq
.
broker
.
processor
.
PullMessageProcessorTest
.
createConsumerData
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
when
;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
ChangeInvisibleTimeProcessorTest
{
private
ChangeInvisibleTimeProcessor
changeInvisibleTimeProcessor
;
@Spy
private
BrokerController
brokerController
=
new
BrokerController
(
new
BrokerConfig
(),
new
NettyServerConfig
(),
new
NettyClientConfig
(),
new
MessageStoreConfig
());
@Mock
private
ChannelHandlerContext
handlerContext
;
@Mock
private
DefaultMessageStore
messageStore
;
@Mock
private
Channel
channel
;
private
String
topic
=
"FooBar"
;
private
String
group
=
"FooBarGroup"
;
private
ClientChannelInfo
clientInfo
;
@Mock
private
Broker2Client
broker2Client
;
@Before
public
void
init
()
throws
IllegalAccessException
,
NoSuchFieldException
{
brokerController
.
setMessageStore
(
messageStore
);
Field
field
=
BrokerController
.
class
.
getDeclaredField
(
"broker2Client"
);
field
.
setAccessible
(
true
);
field
.
set
(
brokerController
,
broker2Client
);
ScheduleMessageService
scheduleMessageService
=
new
ScheduleMessageService
(
messageStore
);
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMessageDelayLevel
(
"5s 10s"
);
when
(
messageStore
.
getMessageStoreConfig
()).
thenReturn
(
messageStoreConfig
);
scheduleMessageService
.
parseDelayLevel
();
when
(
messageStore
.
getScheduleMessageService
()).
thenReturn
(
scheduleMessageService
);
Channel
mockChannel
=
mock
(
Channel
.
class
);
when
(
handlerContext
.
channel
()).
thenReturn
(
mockChannel
);
brokerController
.
getTopicConfigManager
().
getTopicConfigTable
().
put
(
topic
,
new
TopicConfig
());
ConsumerData
consumerData
=
createConsumerData
(
group
,
topic
);
clientInfo
=
new
ClientChannelInfo
(
channel
,
"127.0.0.1"
,
LanguageCode
.
JAVA
,
0
);
brokerController
.
getConsumerManager
().
registerConsumer
(
consumerData
.
getGroupName
(),
clientInfo
,
consumerData
.
getConsumeType
(),
consumerData
.
getMessageModel
(),
consumerData
.
getConsumeFromWhere
(),
consumerData
.
getSubscriptionDataSet
(),
false
);
changeInvisibleTimeProcessor
=
new
ChangeInvisibleTimeProcessor
(
brokerController
);
}
@Test
public
void
testProcessRequest_Success
()
throws
RemotingCommandException
,
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
{
when
(
messageStore
.
putMessage
(
any
(
MessageExtBrokerInner
.
class
))).
thenReturn
(
new
PutMessageResult
(
PutMessageStatus
.
PUT_OK
,
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
)));
int
queueId
=
0
;
long
queueOffset
=
0
;
long
popTime
=
System
.
currentTimeMillis
()
-
1_000
;
long
invisibleTime
=
30_000
;
int
reviveQid
=
0
;
String
brokerName
=
"test_broker"
;
String
extraInfo
=
ExtraInfoUtil
.
buildExtraInfo
(
queueOffset
,
popTime
,
invisibleTime
,
reviveQid
,
topic
,
brokerName
,
queueId
)
+
MessageConst
.
KEY_SEPARATOR
+
queueOffset
;
ChangeInvisibleTimeRequestHeader
requestHeader
=
new
ChangeInvisibleTimeRequestHeader
();
requestHeader
.
setTopic
(
topic
);
requestHeader
.
setQueueId
(
queueId
);
requestHeader
.
setOffset
(
queueOffset
);
requestHeader
.
setConsumerGroup
(
group
);
requestHeader
.
setExtraInfo
(
extraInfo
);
requestHeader
.
setInvisibleTime
(
invisibleTime
);
final
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
CHANGE_MESSAGE_INVISIBLETIME
,
requestHeader
);
request
.
makeCustomHeaderToNet
();
RemotingCommand
responseToReturn
=
changeInvisibleTimeProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
responseToReturn
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUCCESS
);
assertThat
(
responseToReturn
.
getOpaque
()).
isEqualTo
(
request
.
getOpaque
());
}
}
\ No newline at end of file
broker/src/test/java/org/apache/rocketmq/broker/processor/PopBufferMergeServiceTest.java
0 → 100644
浏览文件 @
ced6b023
package
org.apache.rocketmq.broker.processor
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelHandlerContext
;
import
org.apache.commons.lang3.reflect.FieldUtils
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.client.ClientChannelInfo
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.protocol.heartbeat.ConsumerData
;
import
org.apache.rocketmq.remoting.netty.NettyClientConfig
;
import
org.apache.rocketmq.remoting.netty.NettyServerConfig
;
import
org.apache.rocketmq.store.DefaultMessageStore
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.apache.rocketmq.store.pop.AckMsg
;
import
org.apache.rocketmq.store.pop.PopCheckPoint
;
import
org.apache.rocketmq.store.schedule.ScheduleMessageService
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.mockito.Spy
;
import
org.mockito.junit.MockitoJUnitRunner
;
import
static
org
.
apache
.
rocketmq
.
broker
.
processor
.
PullMessageProcessorTest
.
createConsumerData
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
when
;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
PopBufferMergeServiceTest
{
@Spy
private
BrokerController
brokerController
=
new
BrokerController
(
new
BrokerConfig
(),
new
NettyServerConfig
(),
new
NettyClientConfig
(),
new
MessageStoreConfig
());
@Mock
private
PopMessageProcessor
popMessageProcessor
;
@Mock
private
ChannelHandlerContext
handlerContext
;
@Mock
private
DefaultMessageStore
messageStore
;
private
ScheduleMessageService
scheduleMessageService
;
private
ClientChannelInfo
clientChannelInfo
;
private
String
group
=
"FooBarGroup"
;
private
String
topic
=
"FooBar"
;
@Before
public
void
init
()
throws
Exception
{
FieldUtils
.
writeField
(
brokerController
.
getBrokerConfig
(),
"enablePopBufferMerge"
,
true
,
true
);
brokerController
.
setMessageStore
(
messageStore
);
popMessageProcessor
=
new
PopMessageProcessor
(
brokerController
);
scheduleMessageService
=
new
ScheduleMessageService
(
messageStore
);
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMessageDelayLevel
(
"5s 10s"
);
when
(
messageStore
.
getMessageStoreConfig
()).
thenReturn
(
messageStoreConfig
);
scheduleMessageService
.
parseDelayLevel
();
Channel
mockChannel
=
mock
(
Channel
.
class
);
brokerController
.
getTopicConfigManager
().
getTopicConfigTable
().
put
(
topic
,
new
TopicConfig
());
clientChannelInfo
=
new
ClientChannelInfo
(
mockChannel
);
ConsumerData
consumerData
=
createConsumerData
(
group
,
topic
);
brokerController
.
getConsumerManager
().
registerConsumer
(
consumerData
.
getGroupName
(),
clientChannelInfo
,
consumerData
.
getConsumeType
(),
consumerData
.
getMessageModel
(),
consumerData
.
getConsumeFromWhere
(),
consumerData
.
getSubscriptionDataSet
(),
false
);
}
@Test
(
timeout
=
10_000
)
public
void
testBasic
()
throws
Exception
{
PopBufferMergeService
popBufferMergeService
=
new
PopBufferMergeService
(
brokerController
,
popMessageProcessor
);
popBufferMergeService
.
start
();
PopCheckPoint
ck
=
new
PopCheckPoint
();
ck
.
setBitMap
(
0
);
int
msgCnt
=
1
;
ck
.
setNum
((
byte
)
msgCnt
);
long
popTime
=
System
.
currentTimeMillis
()
-
1000
;
ck
.
setPopTime
(
popTime
);
int
invisibleTime
=
30_000
;
ck
.
setInvisibleTime
(
invisibleTime
);
int
offset
=
100
;
ck
.
setStartOffset
(
offset
);
ck
.
setCId
(
group
);
ck
.
setTopic
(
topic
);
int
queueId
=
0
;
ck
.
setQueueId
((
byte
)
queueId
);
int
reviveQid
=
0
;
long
nextBeginOffset
=
101L
;
long
ackOffset
=
offset
;
AckMsg
ackMsg
=
new
AckMsg
();
ackMsg
.
setAckOffset
(
ackOffset
);
ackMsg
.
setStartOffset
(
offset
);
ackMsg
.
setConsumerGroup
(
group
);
ackMsg
.
setTopic
(
topic
);
ackMsg
.
setQueueId
(
queueId
);
ackMsg
.
setPopTime
(
popTime
);
try
{
assertThat
(
popBufferMergeService
.
addCk
(
ck
,
reviveQid
,
ackOffset
,
nextBeginOffset
)).
isTrue
();
assertThat
(
popBufferMergeService
.
getLatestOffset
(
topic
,
group
,
queueId
)).
isEqualTo
(
nextBeginOffset
);
Thread
.
sleep
(
1000
);
// wait background threads of PopBufferMergeService run for some time
assertThat
(
popBufferMergeService
.
addAk
(
reviveQid
,
ackMsg
)).
isTrue
();
assertThat
(
popBufferMergeService
.
getLatestOffset
(
topic
,
group
,
queueId
)).
isEqualTo
(
nextBeginOffset
);
}
finally
{
popBufferMergeService
.
shutdown
(
true
);
}
}
}
\ No newline at end of file
broker/src/test/java/org/apache/rocketmq/broker/processor/PopMessageProcessorTest.java
0 → 100644
浏览文件 @
ced6b023
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.processor
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelHandlerContext
;
import
java.net.InetSocketAddress
;
import
java.nio.ByteBuffer
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.client.ClientChannelInfo
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.constant.ConsumeInitMode
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.heartbeat.ConsumerData
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.netty.NettyClientConfig
;
import
org.apache.rocketmq.remoting.netty.NettyServerConfig
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.store.AppendMessageResult
;
import
org.apache.rocketmq.store.AppendMessageStatus
;
import
org.apache.rocketmq.store.DefaultMessageStore
;
import
org.apache.rocketmq.store.GetMessageResult
;
import
org.apache.rocketmq.store.GetMessageStatus
;
import
org.apache.rocketmq.store.MappedFile
;
import
org.apache.rocketmq.store.PutMessageResult
;
import
org.apache.rocketmq.store.PutMessageStatus
;
import
org.apache.rocketmq.store.SelectMappedBufferResult
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.apache.rocketmq.store.schedule.ScheduleMessageService
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.mockito.Spy
;
import
org.mockito.junit.MockitoJUnitRunner
;
import
static
org
.
apache
.
rocketmq
.
broker
.
processor
.
PullMessageProcessorTest
.
createConsumerData
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyInt
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyLong
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyString
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
when
;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
PopMessageProcessorTest
{
private
PopMessageProcessor
popMessageProcessor
;
@Spy
private
BrokerController
brokerController
=
new
BrokerController
(
new
BrokerConfig
(),
new
NettyServerConfig
(),
new
NettyClientConfig
(),
new
MessageStoreConfig
());
@Mock
private
ChannelHandlerContext
handlerContext
;
@Mock
private
DefaultMessageStore
messageStore
;
private
ScheduleMessageService
scheduleMessageService
;
private
ClientChannelInfo
clientChannelInfo
;
private
String
group
=
"FooBarGroup"
;
private
String
topic
=
"FooBar"
;
@Before
public
void
init
()
{
brokerController
.
setMessageStore
(
messageStore
);
popMessageProcessor
=
new
PopMessageProcessor
(
brokerController
);
scheduleMessageService
=
new
ScheduleMessageService
(
messageStore
);
MessageStoreConfig
messageStoreConfig
=
new
MessageStoreConfig
();
messageStoreConfig
.
setMessageDelayLevel
(
"5s 10s"
);
when
(
messageStore
.
getMessageStoreConfig
()).
thenReturn
(
messageStoreConfig
);
scheduleMessageService
.
parseDelayLevel
();
when
(
messageStore
.
getScheduleMessageService
()).
thenReturn
(
scheduleMessageService
);
when
(
messageStore
.
putMessage
(
any
())).
thenReturn
(
new
PutMessageResult
(
PutMessageStatus
.
PUT_OK
,
new
AppendMessageResult
(
AppendMessageStatus
.
PUT_OK
)));
Channel
mockChannel
=
mock
(
Channel
.
class
);
when
(
mockChannel
.
remoteAddress
()).
thenReturn
(
new
InetSocketAddress
(
1024
));
when
(
handlerContext
.
channel
()).
thenReturn
(
mockChannel
);
brokerController
.
getTopicConfigManager
().
getTopicConfigTable
().
put
(
topic
,
new
TopicConfig
());
clientChannelInfo
=
new
ClientChannelInfo
(
mockChannel
);
ConsumerData
consumerData
=
createConsumerData
(
group
,
topic
);
brokerController
.
getConsumerManager
().
registerConsumer
(
consumerData
.
getGroupName
(),
clientChannelInfo
,
consumerData
.
getConsumeType
(),
consumerData
.
getMessageModel
(),
consumerData
.
getConsumeFromWhere
(),
consumerData
.
getSubscriptionDataSet
(),
false
);
}
@Test
public
void
testProcessRequest_TopicNotExist
()
throws
RemotingCommandException
{
brokerController
.
getTopicConfigManager
().
getTopicConfigTable
().
remove
(
topic
);
final
RemotingCommand
request
=
createPopMsgCommand
();
RemotingCommand
response
=
popMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
TOPIC_NOT_EXIST
);
assertThat
(
response
.
getRemark
()).
contains
(
"topic["
+
topic
+
"] not exist"
);
}
@Test
public
void
testProcessRequest_SubNotExist
()
throws
RemotingCommandException
{
brokerController
.
getConsumerManager
().
unregisterConsumer
(
group
,
clientChannelInfo
,
false
);
final
RemotingCommand
request
=
createPopMsgCommand
();
RemotingCommand
response
=
popMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUBSCRIPTION_NOT_EXIST
);
assertThat
(
response
.
getRemark
()).
contains
(
"consumer's group info not exist"
);
}
@Test
public
void
testProcessRequest_Found
()
throws
RemotingCommandException
{
GetMessageResult
getMessageResult
=
createGetMessageResult
(
1
);
when
(
messageStore
.
getMessage
(
anyString
(),
anyString
(),
anyInt
(),
anyLong
(),
anyInt
(),
any
())).
thenReturn
(
getMessageResult
);
final
RemotingCommand
request
=
createPopMsgCommand
();
RemotingCommand
response
=
popMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUCCESS
);
}
@Test
public
void
testProcessRequest_MsgWasRemoving
()
throws
RemotingCommandException
{
GetMessageResult
getMessageResult
=
createGetMessageResult
(
1
);
getMessageResult
.
setStatus
(
GetMessageStatus
.
MESSAGE_WAS_REMOVING
);
when
(
messageStore
.
getMessage
(
anyString
(),
anyString
(),
anyInt
(),
anyLong
(),
anyInt
(),
any
())).
thenReturn
(
getMessageResult
);
final
RemotingCommand
request
=
createPopMsgCommand
();
RemotingCommand
response
=
popMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUCCESS
);
}
@Test
public
void
testProcessRequest_NoMsgInQueue
()
throws
RemotingCommandException
{
GetMessageResult
getMessageResult
=
createGetMessageResult
(
0
);
getMessageResult
.
setStatus
(
GetMessageStatus
.
NO_MESSAGE_IN_QUEUE
);
when
(
messageStore
.
getMessage
(
anyString
(),
anyString
(),
anyInt
(),
anyLong
(),
anyInt
(),
any
())).
thenReturn
(
getMessageResult
);
final
RemotingCommand
request
=
createPopMsgCommand
();
RemotingCommand
response
=
popMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNull
();
}
private
RemotingCommand
createPopMsgCommand
()
{
PopMessageRequestHeader
requestHeader
=
new
PopMessageRequestHeader
();
requestHeader
.
setConsumerGroup
(
group
);
requestHeader
.
setMaxMsgNums
(
30
);
requestHeader
.
setQueueId
(-
1
);
requestHeader
.
setTopic
(
topic
);
requestHeader
.
setInvisibleTime
(
10_000
);
requestHeader
.
setInitMode
(
ConsumeInitMode
.
MAX
);
requestHeader
.
setOrder
(
false
);
requestHeader
.
setPollTime
(
15_000
);
requestHeader
.
setBornTime
(
System
.
currentTimeMillis
());
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
POP_MESSAGE
,
requestHeader
);
request
.
makeCustomHeaderToNet
();
return
request
;
}
private
GetMessageResult
createGetMessageResult
(
int
msgCnt
)
{
GetMessageResult
getMessageResult
=
new
GetMessageResult
();
getMessageResult
.
setStatus
(
GetMessageStatus
.
FOUND
);
getMessageResult
.
setMinOffset
(
100
);
getMessageResult
.
setMaxOffset
(
1024
);
getMessageResult
.
setNextBeginOffset
(
516
);
for
(
int
i
=
0
;
i
<
msgCnt
;
i
++)
{
ByteBuffer
bb
=
ByteBuffer
.
allocate
(
64
);
bb
.
putLong
(
MessageDecoder
.
MESSAGE_STORE_TIMESTAMP_POSITION
,
System
.
currentTimeMillis
());
getMessageResult
.
addMessage
(
new
SelectMappedBufferResult
(
200
,
bb
,
64
,
new
MappedFile
()));
}
return
getMessageResult
;
}
}
\ No newline at end of file
broker/src/test/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessorTest.java
0 → 100644
浏览文件 @
ced6b023
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.processor
;
import
com.google.common.collect.ImmutableSet
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelHandlerContext
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.client.ClientChannelInfo
;
import
org.apache.rocketmq.broker.loadbalance.AssignmentManager
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.common.message.MessageRequestMode
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.body.QueryAssignmentRequestBody
;
import
org.apache.rocketmq.common.protocol.body.QueryAssignmentResponseBody
;
import
org.apache.rocketmq.common.protocol.body.SetMessageRequestModeRequestBody
;
import
org.apache.rocketmq.common.protocol.heartbeat.ConsumerData
;
import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel
;
import
org.apache.rocketmq.remoting.netty.NettyClientConfig
;
import
org.apache.rocketmq.remoting.netty.NettyServerConfig
;
import
org.apache.rocketmq.remoting.protocol.LanguageCode
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.store.MessageStore
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.mockito.Spy
;
import
org.mockito.junit.MockitoJUnitRunner
;
import
static
org
.
apache
.
rocketmq
.
broker
.
processor
.
PullMessageProcessorTest
.
createConsumerData
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
mockito
.
Mockito
.
doReturn
;
import
static
org
.
mockito
.
Mockito
.
when
;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
QueryAssignmentProcessorTest
{
private
QueryAssignmentProcessor
queryAssignmentProcessor
;
@Spy
private
BrokerController
brokerController
=
new
BrokerController
(
new
BrokerConfig
(),
new
NettyServerConfig
(),
new
NettyClientConfig
(),
new
MessageStoreConfig
());
@Mock
private
AssignmentManager
assignmentManager
;
@Mock
private
ChannelHandlerContext
handlerContext
;
@Mock
private
MessageStore
messageStore
;
@Mock
private
Channel
channel
;
private
String
topic
=
"FooBar"
;
private
String
group
=
"FooBarGroup"
;
private
String
clientId
=
"127.0.0.1"
;
private
ClientChannelInfo
clientInfo
;
@Before
public
void
init
()
throws
IllegalAccessException
,
NoSuchFieldException
{
clientInfo
=
new
ClientChannelInfo
(
channel
,
"127.0.0.1"
,
LanguageCode
.
JAVA
,
0
);
brokerController
.
setMessageStore
(
messageStore
);
doReturn
(
assignmentManager
).
when
(
brokerController
).
getAssignmentManager
();
when
(
assignmentManager
.
getTopicSubscribeInfo
(
topic
)).
thenReturn
(
ImmutableSet
.
of
(
new
MessageQueue
(
topic
,
"broker-1"
,
0
),
new
MessageQueue
(
topic
,
"broker-2"
,
1
)));
queryAssignmentProcessor
=
new
QueryAssignmentProcessor
(
brokerController
);
brokerController
.
getTopicConfigManager
().
getTopicConfigTable
().
put
(
topic
,
new
TopicConfig
());
ConsumerData
consumerData
=
createConsumerData
(
group
,
topic
);
brokerController
.
getConsumerManager
().
registerConsumer
(
consumerData
.
getGroupName
(),
clientInfo
,
consumerData
.
getConsumeType
(),
consumerData
.
getMessageModel
(),
consumerData
.
getConsumeFromWhere
(),
consumerData
.
getSubscriptionDataSet
(),
false
);
}
@Test
public
void
testQueryAssignment
()
throws
Exception
{
brokerController
.
getProducerManager
().
registerProducer
(
group
,
clientInfo
);
final
RemotingCommand
request
=
createQueryAssignmentRequest
();
RemotingCommand
responseToReturn
=
queryAssignmentProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
responseToReturn
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUCCESS
);
assertThat
(
responseToReturn
.
getBody
()).
isNotNull
();
QueryAssignmentResponseBody
responseBody
=
QueryAssignmentResponseBody
.
decode
(
responseToReturn
.
getBody
(),
QueryAssignmentResponseBody
.
class
);
assertThat
(
responseBody
.
getMessageQueueAssignments
()).
size
().
isEqualTo
(
2
);
}
@Test
public
void
testSetMessageRequestMode_Success
()
throws
Exception
{
brokerController
.
getProducerManager
().
registerProducer
(
group
,
clientInfo
);
final
RemotingCommand
request
=
createSetMessageRequestModeRequest
(
topic
);
RemotingCommand
responseToReturn
=
queryAssignmentProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
responseToReturn
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUCCESS
);
}
@Test
public
void
testSetMessageRequestMode_RetryTopic
()
throws
Exception
{
brokerController
.
getProducerManager
().
registerProducer
(
group
,
clientInfo
);
final
RemotingCommand
request
=
createSetMessageRequestModeRequest
(
MixAll
.
RETRY_GROUP_TOPIC_PREFIX
+
topic
);
RemotingCommand
responseToReturn
=
queryAssignmentProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
responseToReturn
.
getCode
()).
isEqualTo
(
ResponseCode
.
NO_PERMISSION
);
}
private
RemotingCommand
createQueryAssignmentRequest
()
{
QueryAssignmentRequestBody
requestBody
=
new
QueryAssignmentRequestBody
();
requestBody
.
setTopic
(
topic
);
requestBody
.
setConsumerGroup
(
group
);
requestBody
.
setClientId
(
clientId
);
requestBody
.
setMessageModel
(
MessageModel
.
CLUSTERING
);
requestBody
.
setStrategyName
(
"AVG"
);
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
QUERY_ASSIGNMENT
,
null
);
request
.
setBody
(
requestBody
.
encode
());
return
request
;
}
private
RemotingCommand
createSetMessageRequestModeRequest
(
String
topic
)
{
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
SET_MESSAGE_REQUEST_MODE
,
null
);
SetMessageRequestModeRequestBody
requestBody
=
new
SetMessageRequestModeRequestBody
();
requestBody
.
setTopic
(
topic
);
requestBody
.
setConsumerGroup
(
group
);
requestBody
.
setMode
(
MessageRequestMode
.
POP
);
requestBody
.
setPopShareQueueNum
(
0
);
request
.
setBody
(
requestBody
.
encode
());
return
request
;
}
private
RemotingCommand
createResponse
(
int
code
,
RemotingCommand
request
)
{
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
null
);
response
.
setCode
(
code
);
response
.
setOpaque
(
request
.
getOpaque
());
return
response
;
}
}
\ No newline at end of file
client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
浏览文件 @
ced6b023
...
...
@@ -20,14 +20,22 @@ import java.util.ArrayList;
import
java.util.List
;
import
org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.logging.InternalLogger
;
/**
* Average Hashing queue algorithm
*/
public
class
AllocateMessageQueueAveragely
implements
AllocateMessageQueueStrategy
{
private
final
InternalLogger
log
=
ClientLogger
.
getLog
();
private
InternalLogger
log
;
public
AllocateMessageQueueAveragely
()
{
log
=
ClientLogger
.
getLog
();
}
public
AllocateMessageQueueAveragely
(
InternalLogger
log
)
{
this
.
log
=
log
;
}
@Override
public
List
<
MessageQueue
>
allocate
(
String
consumerGroup
,
String
currentCID
,
List
<
MessageQueue
>
mqAll
,
...
...
client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
浏览文件 @
ced6b023
...
...
@@ -20,14 +20,22 @@ import java.util.ArrayList;
import
java.util.List
;
import
org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy
;
import
org.apache.rocketmq.client.log.ClientLogger
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.common.message.MessageQueue
;
import
org.apache.rocketmq.logging.InternalLogger
;
/**
* Cycle average Hashing queue algorithm
*/
public
class
AllocateMessageQueueAveragelyByCircle
implements
AllocateMessageQueueStrategy
{
private
final
InternalLogger
log
=
ClientLogger
.
getLog
();
private
InternalLogger
log
;
public
AllocateMessageQueueAveragelyByCircle
()
{
log
=
ClientLogger
.
getLog
();
}
public
AllocateMessageQueueAveragelyByCircle
(
InternalLogger
log
)
{
this
.
log
=
log
;
}
@Override
public
List
<
MessageQueue
>
allocate
(
String
consumerGroup
,
String
currentCID
,
List
<
MessageQueue
>
mqAll
,
...
...
distribution/conf/logback_broker.xml
浏览文件 @
ced6b023
...
...
@@ -257,6 +257,30 @@
</triggeringPolicy>
</appender>
<appender
name=
"RocketmqPopAppender_inner"
class=
"ch.qos.logback.core.rolling.RollingFileAppender"
>
<file>
${user.home}/logs/rocketmqlogs/pop.log
</file>
<append>
true
</append>
<rollingPolicy
class=
"ch.qos.logback.core.rolling.FixedWindowRollingPolicy"
>
<fileNamePattern>
${user.home}/logs/rocketmqlogs/otherdays/pop.%i.log
</fileNamePattern>
<minIndex>
1
</minIndex>
<maxIndex>
20
</maxIndex>
</rollingPolicy>
<triggeringPolicy
class=
"ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy"
>
<maxFileSize>
128MB
</maxFileSize>
</triggeringPolicy>
<encoder>
<pattern>
%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n
</pattern>
<charset
class=
"java.nio.charset.Charset"
>
UTF-8
</charset>
</encoder>
</appender>
<appender
name=
"RocketmqPopAppender"
class=
"ch.qos.logback.classic.AsyncAppender"
>
<appender-ref
ref=
"RocketmqPopAppender_inner"
/>
</appender>
<appender
name=
"STDOUT"
class=
"ch.qos.logback.core.ConsoleAppender"
>
<append>
true
</append>
<encoder>
...
...
@@ -330,6 +354,11 @@
<appender-ref
ref=
"STDOUT"
/>
</logger>
<logger
name=
"RocketmqPop"
additivity=
"false"
>
<level
value=
"INFO"
/>
<appender-ref
ref=
"RocketmqPopAppender"
/>
</logger>
<root>
<level
value=
"INFO"
/>
<appender-ref
ref=
"DefaultAppender"
/>
...
...
pom.xml
浏览文件 @
ced6b023
...
...
@@ -437,7 +437,7 @@
<dependency>
<groupId>
org.mockito
</groupId>
<artifactId>
mockito-core
</artifactId>
<version>
2.2
3.0
</version>
<version>
2.2
8.2
</version>
<scope>
test
</scope>
</dependency>
<dependency>
...
...
@@ -571,6 +571,11 @@
<artifactId>
guava
</artifactId>
<version>
19.0
</version>
</dependency>
<dependency>
<groupId>
com.googlecode.concurrentlinkedhashmap
</groupId>
<artifactId>
concurrentlinkedhashmap-lru
</artifactId>
<version>
1.4.2
</version>
</dependency>
<dependency>
<groupId>
io.openmessaging
</groupId>
<artifactId>
openmessaging-api
</artifactId>
...
...
remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
浏览文件 @
ced6b023
...
...
@@ -172,11 +172,18 @@ public class RemotingHelper {
public
static
String
parseSocketAddressAddr
(
SocketAddress
socketAddress
)
{
if
(
socketAddress
!=
null
)
{
// Default toString of InetSocketAddress is "hostName/IP:port"
final
String
addr
=
socketAddress
.
toString
();
if
(
addr
.
length
()
>
0
)
{
if
(
addr
.
contains
(
"/"
))
{
String
[]
segments
=
addr
.
split
(
"/"
);
if
(
segments
.
length
>
1
)
{
return
segments
[
1
];
}
}
return
addr
.
substring
(
1
);
}
return
addr
;
}
return
""
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录