Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
小五666\n哈哈
Rocketmq
提交
14a75e5c
R
Rocketmq
项目概览
小五666\n哈哈
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
14a75e5c
编写于
2月 22, 2019
作者:
D
duhenglucky
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Add embedded start mode(single service) for snode and enode
上级
d7a2603c
变更
25
隐藏空白更改
内联
并排
Showing
25 changed file
with
481 addition
and
233 deletion
+481
-233
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
...ain/java/org/apache/rocketmq/broker/BrokerController.java
+43
-20
broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
...c/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+13
-10
broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
...cketmq/broker/processor/AbstractSendMessageProcessor.java
+12
-13
broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
...ache/rocketmq/broker/processor/ClientManageProcessor.java
+4
-4
broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
...pache/rocketmq/broker/processor/SendMessageProcessor.java
+31
-16
common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
...src/main/java/org/apache/rocketmq/common/SnodeConfig.java
+32
-0
snode/pom.xml
snode/pom.xml
+6
-1
snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
.../main/java/org/apache/rocketmq/snode/SnodeController.java
+10
-7
snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
...src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
+32
-20
snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
...pache/rocketmq/snode/client/SubscriptionGroupManager.java
+3
-31
snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
...org/apache/rocketmq/snode/client/SubscriptionManager.java
+0
-5
snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
...e/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
+2
-13
snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
...g/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
+32
-32
snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
...che/rocketmq/snode/processor/ConsumerManageProcessor.java
+38
-7
snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
...apache/rocketmq/snode/processor/PullMessageProcessor.java
+4
-2
snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
...apache/rocketmq/snode/processor/SendMessageProcessor.java
+3
-3
snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java
...node/processor/mqtthandler/MqttPingreqMessageHandler.java
+3
-1
snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java
.../java/org/apache/rocketmq/snode/service/AdminService.java
+2
-2
snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
.../java/org/apache/rocketmq/snode/service/EnodeService.java
+20
-13
snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
...apache/rocketmq/snode/service/impl/ClientServiceImpl.java
+1
-0
snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
...he/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
+138
-0
snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
...g/apache/rocketmq/snode/service/impl/PushServiceImpl.java
+2
-2
snode/src/main/java/org/apache/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
...e/rocketmq/snode/service/impl/RemoteEnodeServiceImpl.java
+43
-24
snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
...he/rocketmq/snode/processor/SendMessageProcessorTest.java
+2
-2
snode/src/test/java/org/apache/rocketmq/snode/service/RemoteEnodeServiceImplTest.java
...he/rocketmq/snode/service/RemoteEnodeServiceImplTest.java
+5
-5
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
浏览文件 @
14a75e5c
...
...
@@ -121,6 +121,7 @@ public class BrokerController {
private
final
ClientHousekeepingService
clientHousekeepingService
;
private
final
PullMessageProcessor
pullMessageProcessor
;
private
final
SnodePullMessageProcessor
snodePullMessageProcessor
;
private
final
SendMessageProcessor
sendProcessor
;
private
final
PullRequestHoldService
pullRequestHoldService
;
private
final
MessageArrivingListener
messageArrivingListener
;
private
final
Broker2Client
broker2Client
;
...
...
@@ -164,6 +165,9 @@ public class BrokerController {
private
TransactionalMessageService
transactionalMessageService
;
private
AbstractTransactionalMessageCheckListener
transactionalMessageCheckListener
;
private
Future
<?>
slaveSyncFuture
;
private
ClientManageProcessor
clientManageProcessor
;
private
AdminBrokerProcessor
adminProcessor
;
private
ConsumerManageProcessor
consumerManageProcessor
;
public
BrokerController
(
final
BrokerConfig
brokerConfig
,
...
...
@@ -178,6 +182,7 @@ public class BrokerController {
this
.
consumerOffsetManager
=
new
ConsumerOffsetManager
(
this
);
this
.
topicConfigManager
=
new
TopicConfigManager
(
this
);
this
.
pullMessageProcessor
=
new
PullMessageProcessor
(
this
);
this
.
sendProcessor
=
new
SendMessageProcessor
(
this
);
this
.
pullRequestHoldService
=
new
PullRequestHoldService
(
this
);
this
.
messageArrivingListener
=
new
NotifyMessageArrivingListener
(
this
.
pullRequestHoldService
);
this
.
consumerIdsChangeListener
=
new
DefaultConsumerIdsChangeListener
(
this
);
...
...
@@ -543,9 +548,8 @@ public class BrokerController {
/**
* SendMessageProcessor
*/
SendMessageProcessor
sendProcessor
=
new
SendMessageProcessor
(
this
);
sendProcessor
.
registerSendMessageHook
(
sendMessageHookList
);
sendProcessor
.
registerConsumeMessageHook
(
consumeMessageHookList
);
this
.
sendProcessor
.
registerSendMessageHook
(
sendMessageHookList
);
this
.
sendProcessor
.
registerConsumeMessageHook
(
consumeMessageHookList
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE
,
sendProcessor
,
this
.
sendMessageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
SEND_MESSAGE_V2
,
sendProcessor
,
this
.
sendMessageExecutor
);
...
...
@@ -575,28 +579,28 @@ public class BrokerController {
/**
* ClientManageProcessor
*/
ClientManageProcessor
client
Processor
=
new
ClientManageProcessor
(
this
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
HEART_BEAT
,
clientProcessor
,
this
.
heartbeatExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
UNREGISTER_CLIENT
,
clientProcessor
,
this
.
clientManageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
CHECK_CLIENT_CONFIG
,
clientProcessor
,
this
.
clientManageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
CREATE_RETRY_TOPIC
,
clientProcessor
,
this
.
clientManageExecutor
);
this
.
clientManage
Processor
=
new
ClientManageProcessor
(
this
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
HEART_BEAT
,
client
Manage
Processor
,
this
.
heartbeatExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
UNREGISTER_CLIENT
,
client
Manage
Processor
,
this
.
clientManageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
CHECK_CLIENT_CONFIG
,
client
Manage
Processor
,
this
.
clientManageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
CREATE_RETRY_TOPIC
,
client
Manage
Processor
,
this
.
clientManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
HEART_BEAT
,
clientProcessor
,
this
.
heartbeatExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
UNREGISTER_CLIENT
,
clientProcessor
,
this
.
clientManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
CHECK_CLIENT_CONFIG
,
clientProcessor
,
this
.
clientManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
CREATE_RETRY_TOPIC
,
clientProcessor
,
this
.
clientManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
HEART_BEAT
,
client
Manage
Processor
,
this
.
heartbeatExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
UNREGISTER_CLIENT
,
client
Manage
Processor
,
this
.
clientManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
CHECK_CLIENT_CONFIG
,
client
Manage
Processor
,
this
.
clientManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
CREATE_RETRY_TOPIC
,
client
Manage
Processor
,
this
.
clientManageExecutor
);
/**
* ConsumerManageProcessor
*/
ConsumerManageProcessor
consumerManageProcessor
=
new
ConsumerManageProcessor
(
this
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
GET_CONSUMER_LIST_BY_GROUP
,
consumerManageProcessor
,
this
.
consumerManageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
UPDATE_CONSUMER_OFFSET
,
consumerManageProcessor
,
this
.
consumerManageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
QUERY_CONSUMER_OFFSET
,
consumerManageProcessor
,
this
.
consumerManageExecutor
);
this
.
consumerManageProcessor
=
new
ConsumerManageProcessor
(
this
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
GET_CONSUMER_LIST_BY_GROUP
,
this
.
consumerManageProcessor
,
this
.
consumerManageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
UPDATE_CONSUMER_OFFSET
,
this
.
consumerManageProcessor
,
this
.
consumerManageExecutor
);
this
.
remotingServer
.
registerProcessor
(
RequestCode
.
QUERY_CONSUMER_OFFSET
,
this
.
consumerManageProcessor
,
this
.
consumerManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
GET_CONSUMER_LIST_BY_GROUP
,
consumerManageProcessor
,
this
.
consumerManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
UPDATE_CONSUMER_OFFSET
,
consumerManageProcessor
,
this
.
consumerManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
QUERY_CONSUMER_OFFSET
,
consumerManageProcessor
,
this
.
consumerManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
GET_CONSUMER_LIST_BY_GROUP
,
this
.
consumerManageProcessor
,
this
.
consumerManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
UPDATE_CONSUMER_OFFSET
,
this
.
consumerManageProcessor
,
this
.
consumerManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
QUERY_CONSUMER_OFFSET
,
this
.
consumerManageProcessor
,
this
.
consumerManageExecutor
);
/**
* EndTransactionProcessor
...
...
@@ -607,7 +611,7 @@ public class BrokerController {
/**
* Default
*/
AdminBrokerProcessor
adminProcessor
=
new
AdminBrokerProcessor
(
this
);
this
.
adminProcessor
=
new
AdminBrokerProcessor
(
this
);
this
.
remotingServer
.
registerDefaultProcessor
(
adminProcessor
,
this
.
adminBrokerExecutor
);
this
.
fastRemotingServer
.
registerDefaultProcessor
(
adminProcessor
,
this
.
adminBrokerExecutor
);
}
...
...
@@ -1220,4 +1224,23 @@ public class BrokerController {
}
}
public
SendMessageProcessor
getSendProcessor
()
{
return
sendProcessor
;
}
public
ClientManageProcessor
getClientManageProcessor
()
{
return
clientManageProcessor
;
}
public
AdminBrokerProcessor
getAdminProcessor
()
{
return
adminProcessor
;
}
public
void
setAdminProcessor
(
AdminBrokerProcessor
adminProcessor
)
{
this
.
adminProcessor
=
adminProcessor
;
}
public
ConsumerManageProcessor
getConsumerManageProcessor
()
{
return
consumerManageProcessor
;
}
}
broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
浏览文件 @
14a75e5c
...
...
@@ -18,6 +18,11 @@ package org.apache.rocketmq.broker;
import
ch.qos.logback.classic.LoggerContext
;
import
ch.qos.logback.classic.joran.JoranConfigurator
;
import
java.io.BufferedInputStream
;
import
java.io.FileInputStream
;
import
java.io.InputStream
;
import
java.util.Properties
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
org.apache.commons.cli.CommandLine
;
import
org.apache.commons.cli.Option
;
import
org.apache.commons.cli.Options
;
...
...
@@ -28,10 +33,10 @@ import org.apache.rocketmq.common.MixAll;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.common.RemotingUtil
;
import
org.apache.rocketmq.remoting.common.TlsMode
;
import
org.apache.rocketmq.remoting.ClientConfig
;
import
org.apache.rocketmq.remoting.ServerConfig
;
import
org.apache.rocketmq.remoting.common.RemotingUtil
;
import
org.apache.rocketmq.remoting.common.TlsMode
;
import
org.apache.rocketmq.remoting.netty.NettySystemConfig
;
import
org.apache.rocketmq.remoting.netty.TlsSystemConfig
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
...
...
@@ -40,12 +45,6 @@ import org.apache.rocketmq.store.config.BrokerRole;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.slf4j.LoggerFactory
;
import
java.io.BufferedInputStream
;
import
java.io.FileInputStream
;
import
java.io.InputStream
;
import
java.util.Properties
;
import
java.util.concurrent.atomic.AtomicInteger
;
import
static
org
.
apache
.
rocketmq
.
remoting
.
netty
.
TlsSystemConfig
.
TLS_ENABLE
;
public
class
BrokerStartup
{
...
...
@@ -53,6 +52,7 @@ public class BrokerStartup {
public
static
CommandLine
commandLine
=
null
;
public
static
String
configFile
=
null
;
public
static
InternalLogger
log
;
private
static
BrokerController
brokerController
=
null
;
public
static
void
main
(
String
[]
args
)
{
start
(
createBrokerController
(
args
));
...
...
@@ -238,7 +238,7 @@ public class BrokerStartup {
}
}
},
"ShutdownHook"
));
brokerController
=
controller
;
return
controller
;
}
catch
(
Throwable
e
)
{
e
.
printStackTrace
();
...
...
@@ -260,7 +260,6 @@ public class BrokerStartup {
private
static
Options
buildCommandlineOptions
(
final
Options
options
)
{
Option
opt
=
new
Option
(
"c"
,
"configFile"
,
true
,
"Broker config properties file"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
...
...
@@ -275,4 +274,8 @@ public class BrokerStartup {
return
options
;
}
public
static
BrokerController
getBrokerController
()
{
return
brokerController
;
}
}
broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
浏览文件 @
14a75e5c
...
...
@@ -17,6 +17,11 @@
package
org.apache.rocketmq.broker.processor
;
import
io.netty.channel.ChannelHandlerContext
;
import
java.net.InetSocketAddress
;
import
java.net.SocketAddress
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Random
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.mqtrace.SendMessageContext
;
import
org.apache.rocketmq.broker.mqtrace.SendMessageHook
;
...
...
@@ -27,8 +32,6 @@ import org.apache.rocketmq.common.constant.DBMsgConstants;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.constant.PermName
;
import
org.apache.rocketmq.common.help.FAQUrl
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.common.message.MessageAccessor
;
import
org.apache.rocketmq.common.message.MessageConst
;
import
org.apache.rocketmq.common.message.MessageDecoder
;
...
...
@@ -40,18 +43,14 @@ import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
import
org.apache.rocketmq.common.sysflag.MessageSysFlag
;
import
org.apache.rocketmq.common.sysflag.TopicSysFlag
;
import
org.apache.rocketmq.common.utils.ChannelUtil
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.RequestProcessor
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.RequestProcessor
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.store.MessageExtBrokerInner
;
import
java.net.InetSocketAddress
;
import
java.net.SocketAddress
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Random
;
public
abstract
class
AbstractSendMessageProcessor
implements
RequestProcessor
{
protected
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
BROKER_LOGGER_NAME
);
...
...
@@ -159,7 +158,7 @@ public abstract class AbstractSendMessageProcessor implements RequestProcessor {
return
response
;
}
protected
RemotingCommand
msgCheck
(
final
ChannelHandlerContext
ctx
,
protected
RemotingCommand
msgCheck
(
final
String
remoteAddress
,
final
SendMessageRequestHeader
requestHeader
,
final
RemotingCommand
response
)
{
if
(!
PermName
.
isWriteable
(
this
.
brokerController
.
getBrokerConfig
().
getBrokerPermission
())
&&
this
.
brokerController
.
getTopicConfigManager
().
isOrderTopic
(
requestHeader
.
getTopic
()))
{
...
...
@@ -188,11 +187,11 @@ public abstract class AbstractSendMessageProcessor implements RequestProcessor {
}
}
log
.
warn
(
"the topic {} not exist, producer: {}"
,
requestHeader
.
getTopic
(),
ctx
.
channel
().
remoteAddress
()
);
log
.
warn
(
"the topic {} not exist, producer: {}"
,
requestHeader
.
getTopic
(),
remoteAddress
);
topicConfig
=
this
.
brokerController
.
getTopicConfigManager
().
createTopicInSendMessageMethod
(
requestHeader
.
getTopic
(),
requestHeader
.
getDefaultTopic
(),
RemotingHelper
.
parseChannelRemoteAddr
(
ctx
.
channel
())
,
remoteAddress
,
requestHeader
.
getDefaultTopicQueueNums
(),
topicSysFlag
);
if
(
null
==
topicConfig
)
{
...
...
@@ -218,7 +217,7 @@ public abstract class AbstractSendMessageProcessor implements RequestProcessor {
String
errorInfo
=
String
.
format
(
"request queueId[%d] is illegal, %s Producer: %s"
,
queueIdInt
,
topicConfig
.
toString
(),
RemotingHelper
.
parseChannelRemoteAddr
(
ctx
.
channel
())
);
remoteAddress
);
log
.
warn
(
errorInfo
);
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
浏览文件 @
14a75e5c
...
...
@@ -207,9 +207,8 @@ public class ClientManageProcessor implements RequestProcessor {
private
RemotingCommand
createRetryTopic
(
ChannelHandlerContext
ctx
,
RemotingCommand
request
)
throws
RemotingCommandException
{
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
null
);
final
CreateRetryTopicRequestHeader
requestHeader
=
(
CreateRetryTopicRequestHeader
)
request
.
decodeCommandCustomHeader
(
CreateRetryTopicRequestHeader
.
class
);
final
CreateRetryTopicRequestHeader
requestHeader
=
(
CreateRetryTopicRequestHeader
)
request
.
decodeCommandCustomHeader
(
CreateRetryTopicRequestHeader
.
class
);
if
(
requestHeader
.
getGroupName
()
!=
null
)
{
SubscriptionGroupConfig
subscriptionGroupConfig
=
this
.
brokerController
.
getSubscriptionGroupManager
().
findSubscriptionGroupConfig
(
requestHeader
.
getGroupName
());
...
...
@@ -217,7 +216,8 @@ public class ClientManageProcessor implements RequestProcessor {
createRetryTopic
(
false
,
requestHeader
.
getGroupName
(),
subscriptionGroupConfig
.
getRetryQueueNums
());
}
}
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setRemark
(
null
);
return
response
;
}
...
...
broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
浏览文件 @
14a75e5c
...
...
@@ -46,6 +46,7 @@ import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import
org.apache.rocketmq.common.sysflag.TopicSysFlag
;
import
org.apache.rocketmq.remoting.RemotingChannel
;
import
org.apache.rocketmq.remoting.RequestProcessor
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.netty.NettyChannelHandlerContextImpl
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
...
...
@@ -71,7 +72,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
SendMessageContext
mqtraceContext
;
switch
(
request
.
getCode
())
{
case
RequestCode
.
CONSUMER_SEND_MSG_BACK
:
return
this
.
consumerSendMsgBack
(
ctx
,
request
);
return
this
.
consumerSendMsgBack
(
request
);
default
:
SendMessageRequestHeader
requestHeader
=
parseRequestHeader
(
request
);
if
(
requestHeader
==
null
)
{
...
...
@@ -82,10 +83,13 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
this
.
executeSendMessageHookBefore
(
ctx
,
request
,
mqtraceContext
);
RemotingCommand
response
;
// String remoteAddress = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
SocketAddress
bornHost
=
ctx
.
channel
().
remoteAddress
();
if
(
requestHeader
.
isBatch
())
{
response
=
this
.
sendBatchMessage
(
ctx
,
request
,
mqtraceContext
,
requestHeader
);
response
=
this
.
sendBatchMessage
(
bornHost
,
request
,
mqtraceContext
,
requestHeader
);
}
else
{
response
=
this
.
sendMessage
(
ctx
,
request
,
mqtraceContext
,
requestHeader
);
response
=
this
.
sendMessage
(
bornHost
,
request
,
mqtraceContext
,
requestHeader
);
}
this
.
executeSendMessageHookAfter
(
response
,
mqtraceContext
);
...
...
@@ -99,7 +103,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
this
.
brokerController
.
getMessageStore
().
isTransientStorePoolDeficient
();
}
private
RemotingCommand
consumerSendMsgBack
(
final
ChannelHandlerContext
ctx
,
final
RemotingCommand
request
)
private
RemotingCommand
consumerSendMsgBack
(
final
RemotingCommand
request
)
throws
RemotingCommandException
{
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
null
);
final
ConsumerSendMsgBackRequestHeader
requestHeader
=
...
...
@@ -296,7 +300,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return
true
;
}
private
RemotingCommand
sendMessage
(
final
ChannelHandlerContext
ctx
,
private
RemotingCommand
sendMessage
(
final
SocketAddress
remoteAddress
,
final
RemotingCommand
request
,
final
SendMessageContext
sendMessageContext
,
final
SendMessageRequestHeader
requestHeader
)
throws
RemotingCommandException
{
...
...
@@ -319,7 +323,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
response
.
setCode
(-
1
);
super
.
msgCheck
(
ctx
,
requestHeader
,
response
);
super
.
msgCheck
(
RemotingHelper
.
parseChannelRemoteAddr
(
remoteAddress
)
,
requestHeader
,
response
);
if
(
response
.
getCode
()
!=
-
1
)
{
return
response
;
}
...
...
@@ -346,9 +350,12 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageAccessor
.
setProperties
(
msgInner
,
MessageDecoder
.
string2messageProperties
(
requestHeader
.
getProperties
()));
msgInner
.
setPropertiesString
(
requestHeader
.
getProperties
());
msgInner
.
setBornTimestamp
(
requestHeader
.
getBornTimestamp
());
msgInner
.
setBornHost
(
ctx
.
channel
().
remoteAddress
());
msgInner
.
setBornHost
(
requestHeader
.
getBornHost
());
msgInner
.
setStoreHost
(
this
.
getStoreHost
());
msgInner
.
setReconsumeTimes
(
requestHeader
.
getReconsumeTimes
()
==
null
?
0
:
requestHeader
.
getReconsumeTimes
());
msgInner
.
setBornHost
(
remoteAddress
);
// ByteBuffer hostHolder = ByteBuffer.allocate(8);
// String bornHost = msgInner.getStoreHostBytes(hostHolder).toString();
PutMessageResult
putMessageResult
=
null
;
Map
<
String
,
String
>
oriProps
=
MessageDecoder
.
string2messageProperties
(
requestHeader
.
getProperties
());
String
traFlag
=
oriProps
.
get
(
MessageConst
.
PROPERTY_TRANSACTION_PREPARED
);
...
...
@@ -365,14 +372,14 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
putMessageResult
=
this
.
brokerController
.
getMessageStore
().
putMessage
(
msgInner
);
}
return
handlePutMessageResult
(
putMessageResult
,
response
,
request
,
msgInner
,
responseHeader
,
sendMessageContext
,
ctx
,
queueIdInt
);
return
handlePutMessageResult
(
putMessageResult
,
response
,
request
,
msgInner
,
responseHeader
,
sendMessageContext
,
queueIdInt
,
remoteAddress
);
}
private
RemotingCommand
handlePutMessageResult
(
PutMessageResult
putMessageResult
,
RemotingCommand
response
,
RemotingCommand
request
,
MessageExt
msg
,
SendMessageResponseHeader
responseHeader
,
SendMessageContext
sendMessageContext
,
ChannelHandlerContext
ctx
,
int
queueIdInt
)
{
SendMessageResponseHeader
responseHeader
,
SendMessageContext
sendMessageContext
,
int
queueIdInt
,
SocketAddress
bornHost
)
{
if
(
putMessageResult
==
null
)
{
response
.
setCode
(
ResponseCode
.
SYSTEM_ERROR
);
response
.
setRemark
(
"store putMessage return null"
);
...
...
@@ -445,8 +452,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
responseHeader
.
setCommitLogOffset
(
putMessageResult
.
getAppendMessageResult
().
getWroteOffset
());
responseHeader
.
setStoreTimestamp
(
putMessageResult
.
getAppendMessageResult
().
getStoreTimestamp
());
responseHeader
.
setStoreSize
(
putMessageResult
.
getAppendMessageResult
().
getWroteBytes
());
responseHeader
.
setStoreHost
(
ctx
.
channel
().
localAddress
().
toString
(
));
doResponse
(
ctx
,
request
,
response
);
responseHeader
.
setStoreHost
(
RemotingHelper
.
parseChannelRemoteAddr
(
bornHost
));
//
doResponse(ctx, request, response);
if
(
hasSendMessageHook
())
{
sendMessageContext
.
setMsgId
(
responseHeader
.
getMsgId
());
...
...
@@ -462,6 +469,10 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
sendMessageContext
.
setCommercialSendSize
(
wroteSize
);
sendMessageContext
.
setCommercialOwner
(
owner
);
}
if
(!
request
.
isOnewayRPC
())
{
response
.
setCustomHeader
(
responseHeader
);
return
response
;
}
return
null
;
}
else
{
if
(
hasSendMessageHook
())
{
...
...
@@ -477,7 +488,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
return
response
;
}
private
RemotingCommand
sendBatchMessage
(
final
ChannelHandlerContext
ctx
,
private
RemotingCommand
sendBatchMessage
(
final
SocketAddress
remoteAddress
,
final
RemotingCommand
request
,
final
SendMessageContext
sendMessageContext
,
final
SendMessageRequestHeader
requestHeader
)
throws
RemotingCommandException
{
...
...
@@ -500,7 +511,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
}
response
.
setCode
(-
1
);
super
.
msgCheck
(
ctx
,
requestHeader
,
response
);
super
.
msgCheck
(
RemotingHelper
.
parseChannelRemoteAddr
(
remoteAddress
)
,
requestHeader
,
response
);
if
(
response
.
getCode
()
!=
-
1
)
{
return
response
;
}
...
...
@@ -537,13 +548,17 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
MessageAccessor
.
setProperties
(
messageExtBatch
,
MessageDecoder
.
string2messageProperties
(
requestHeader
.
getProperties
()));
messageExtBatch
.
setBody
(
request
.
getBody
());
messageExtBatch
.
setBornTimestamp
(
requestHeader
.
getBornTimestamp
());
messageExtBatch
.
setBornHost
(
ctx
.
channel
().
remoteAddress
());
messageExtBatch
.
setBornHost
(
requestHeader
.
getBornHost
());
messageExtBatch
.
setStoreHost
(
this
.
getStoreHost
());
messageExtBatch
.
setReconsumeTimes
(
requestHeader
.
getReconsumeTimes
()
==
null
?
0
:
requestHeader
.
getReconsumeTimes
());
// ByteBuffer hostHolder = ByteBuffer.allocate(8);
// String storeHost = messageExtBatch.getStoreHostBytes(hostHolder).toString();
PutMessageResult
putMessageResult
=
this
.
brokerController
.
getMessageStore
().
putMessages
(
messageExtBatch
);
return
handlePutMessageResult
(
putMessageResult
,
response
,
request
,
messageExtBatch
,
responseHeader
,
sendMessageContext
,
ctx
,
queueIdIn
t
);
return
handlePutMessageResult
(
putMessageResult
,
response
,
request
,
messageExtBatch
,
responseHeader
,
sendMessageContext
,
queueIdInt
,
storeHos
t
);
}
public
boolean
hasConsumeMessageHook
()
{
...
...
common/src/main/java/org/apache/rocketmq/common/SnodeConfig.java
浏览文件 @
14a75e5c
...
...
@@ -22,6 +22,8 @@ import org.apache.rocketmq.common.annotation.ImportantField;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.ClientConfig
;
import
org.apache.rocketmq.remoting.ServerConfig
;
import
org.apache.rocketmq.remoting.common.RemotingUtil
;
public
class
SnodeConfig
{
...
...
@@ -32,6 +34,10 @@ public class SnodeConfig {
private
String
rocketmqHome
=
System
.
getProperty
(
MixAll
.
ROCKETMQ_HOME_PROPERTY
,
System
.
getenv
(
MixAll
.
ROCKETMQ_HOME_ENV
));
private
ServerConfig
nettyServerConfig
;
private
ClientConfig
nettyClientConfig
;
@ImportantField
private
String
namesrvAddr
=
System
.
getProperty
(
MixAll
.
NAMESRV_ADDR_PROPERTY
,
System
.
getenv
(
MixAll
.
NAMESRV_ADDR_ENV
));
...
...
@@ -106,6 +112,9 @@ public class SnodeConfig {
@ImportantField
private
boolean
aclEnable
=
false
;
@ImportantField
private
boolean
embeddedModeEnable
=
true
;
public
void
setSnodeHeartBeatInterval
(
long
snodeHeartBeatInterval
)
{
this
.
snodeHeartBeatInterval
=
snodeHeartBeatInterval
;
}
...
...
@@ -410,4 +419,27 @@ public class SnodeConfig {
this
.
loadOffsetInterval
=
loadOffsetInterval
;
}
public
boolean
isEmbeddedModeEnable
()
{
return
embeddedModeEnable
;
}
public
void
setEmbeddedModeEnable
(
boolean
embeddedModeEnable
)
{
this
.
embeddedModeEnable
=
embeddedModeEnable
;
}
public
ServerConfig
getNettyServerConfig
()
{
return
nettyServerConfig
;
}
public
void
setNettyServerConfig
(
ServerConfig
nettyServerConfig
)
{
this
.
nettyServerConfig
=
nettyServerConfig
;
}
public
ClientConfig
getNettyClientConfig
()
{
return
nettyClientConfig
;
}
public
void
setNettyClientConfig
(
ClientConfig
nettyClientConfig
)
{
this
.
nettyClientConfig
=
nettyClientConfig
;
}
}
snode/pom.xml
浏览文件 @
14a75e5c
...
...
@@ -15,7 +15,8 @@
limitations under the License.
-->
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<project
xmlns=
"http://maven.apache.org/POM/4.0.0"
xmlns:xsi=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation=
"http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<parent>
<groupId>
org.apache.rocketmq
</groupId>
<artifactId>
rocketmq-all
</artifactId>
...
...
@@ -92,6 +93,10 @@
<groupId>
io.prometheus
</groupId>
<artifactId>
simpleclient_hotspot
</artifactId>
</dependency>
<dependency>
<groupId>
${project.groupId}
</groupId>
<artifactId>
rocketmq-broker
</artifactId>
</dependency>
</dependencies>
<build>
...
...
snode/src/main/java/org/apache/rocketmq/snode/SnodeController.java
浏览文件 @
14a75e5c
...
...
@@ -23,6 +23,7 @@ import java.util.concurrent.Executors;
import
java.util.concurrent.ScheduledExecutorService
;
import
java.util.concurrent.TimeUnit
;
import
org.apache.rocketmq.acl.AccessValidator
;
import
org.apache.rocketmq.broker.BrokerStartup
;
import
org.apache.rocketmq.common.SnodeConfig
;
import
org.apache.rocketmq.common.ThreadFactoryImpl
;
import
org.apache.rocketmq.common.constant.LoggerName
;
...
...
@@ -69,10 +70,11 @@ import org.apache.rocketmq.snode.service.PushService;
import
org.apache.rocketmq.snode.service.ScheduledService
;
import
org.apache.rocketmq.snode.service.WillMessageService
;
import
org.apache.rocketmq.snode.service.impl.ClientServiceImpl
;
import
org.apache.rocketmq.snode.service.impl.EnodeServiceImpl
;
import
org.apache.rocketmq.snode.service.impl.
Local
EnodeServiceImpl
;
import
org.apache.rocketmq.snode.service.impl.MetricsServiceImpl
;
import
org.apache.rocketmq.snode.service.impl.NnodeServiceImpl
;
import
org.apache.rocketmq.snode.service.impl.PushServiceImpl
;
import
org.apache.rocketmq.snode.service.impl.RemoteEnodeServiceImpl
;
import
org.apache.rocketmq.snode.service.impl.ScheduledServiceImpl
;
import
org.apache.rocketmq.snode.service.impl.WillMessageServiceImpl
;
...
...
@@ -127,7 +129,11 @@ public class SnodeController {
this
.
nettyClientConfig
=
nettyClientConfig
;
this
.
nettyServerConfig
=
nettyServerConfig
;
this
.
snodeConfig
=
snodeConfig
;
this
.
enodeService
=
new
EnodeServiceImpl
(
this
);
if
(!
this
.
snodeConfig
.
isEmbeddedModeEnable
())
{
this
.
enodeService
=
new
RemoteEnodeServiceImpl
(
this
);
}
else
{
this
.
enodeService
=
new
LocalEnodeServiceImpl
(
BrokerStartup
.
getBrokerController
());
}
this
.
nnodeService
=
new
NnodeServiceImpl
(
this
);
this
.
scheduledService
=
new
ScheduledServiceImpl
(
this
);
this
.
remotingClient
=
RemotingClientFactory
.
getInstance
().
createRemotingClient
()
...
...
@@ -163,7 +169,6 @@ public class SnodeController {
"SnodeHeartbeatThread"
,
true
);
this
.
consumerManageExecutor
=
ThreadUtils
.
newThreadPoolExecutor
(
snodeConfig
.
getSnodeSendMessageMinPoolSize
(),
snodeConfig
.
getSnodeSendMessageMaxPoolSize
(),
...
...
@@ -228,10 +233,8 @@ public class SnodeController {
}
public
boolean
initialize
()
{
this
.
snodeServer
=
RemotingServerFactory
.
getInstance
().
createRemotingServer
()
.
init
(
this
.
nettyServerConfig
,
this
.
clientHousekeepingService
);
this
.
mqttRemotingServer
=
RemotingServerFactory
.
getInstance
().
createRemotingServer
(
RemotingUtil
.
MQTT_PROTOCOL
)
this
.
snodeServer
=
RemotingServerFactory
.
getInstance
().
createRemotingServer
().
init
(
this
.
nettyServerConfig
,
this
.
clientHousekeepingService
);
this
.
mqttRemotingServer
=
RemotingServerFactory
.
getInstance
().
createRemotingServer
(
RemotingUtil
.
MQTT_PROTOCOL
)
.
init
(
this
.
nettyServerConfig
,
this
.
clientHousekeepingService
);
this
.
registerProcessor
();
initSnodeInterceptorGroup
();
...
...
snode/src/main/java/org/apache/rocketmq/snode/SnodeStartup.java
浏览文件 @
14a75e5c
...
...
@@ -16,8 +16,6 @@
*/
package
org.apache.rocketmq.snode
;
import
static
org
.
apache
.
rocketmq
.
remoting
.
netty
.
TlsSystemConfig
.
TLS_ENABLE
;
import
ch.qos.logback.classic.LoggerContext
;
import
ch.qos.logback.classic.joran.JoranConfigurator
;
import
ch.qos.logback.core.joran.spi.JoranException
;
...
...
@@ -31,6 +29,7 @@ import org.apache.commons.cli.CommandLine;
import
org.apache.commons.cli.Option
;
import
org.apache.commons.cli.Options
;
import
org.apache.commons.cli.PosixParser
;
import
org.apache.rocketmq.broker.BrokerStartup
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.SnodeConfig
;
import
org.apache.rocketmq.common.constant.LoggerName
;
...
...
@@ -44,6 +43,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import
org.apache.rocketmq.srvutil.ServerUtil
;
import
org.slf4j.LoggerFactory
;
import
static
org
.
apache
.
rocketmq
.
remoting
.
netty
.
TlsSystemConfig
.
TLS_ENABLE
;
public
class
SnodeStartup
{
private
static
InternalLogger
log
;
public
static
Properties
properties
=
null
;
...
...
@@ -51,13 +52,17 @@ public class SnodeStartup {
public
static
String
configFile
=
null
;
public
static
void
main
(
String
[]
args
)
throws
IOException
,
JoranException
{
startup
(
createSnodeController
(
args
));
SnodeConfig
snodeConfig
=
loadConfig
(
args
);
if
(
snodeConfig
.
isEmbeddedModeEnable
())
{
BrokerStartup
.
start
(
BrokerStartup
.
createBrokerController
(
args
));
}
SnodeController
snodeController
=
createSnodeController
(
snodeConfig
);
startup
(
snodeController
);
}
public
static
SnodeController
startup
(
SnodeController
controller
)
{
try
{
controller
.
start
();
String
tip
=
"The snode["
+
controller
.
getSnodeConfig
().
getSnodeName
()
+
", "
+
controller
.
getSnodeConfig
().
getSnodeIP1
()
+
"] boot success. serializeType="
+
RemotingCommand
.
getSerializeTypeConfigInThisServer
();
...
...
@@ -74,7 +79,7 @@ public class SnodeStartup {
return
null
;
}
public
static
SnodeCon
troller
createSnodeController
(
String
[]
args
)
throws
IOException
,
Joran
Exception
{
public
static
SnodeCon
fig
loadConfig
(
String
[]
args
)
throws
IO
Exception
{
Options
options
=
ServerUtil
.
buildCommandlineOptions
(
new
Options
());
commandLine
=
ServerUtil
.
parseCmdLine
(
"snode"
,
args
,
buildCommandlineOptions
(
options
),
new
PosixParser
());
...
...
@@ -82,12 +87,11 @@ public class SnodeStartup {
System
.
exit
(-
1
);
}
final
SnodeConfig
snodeConfig
=
new
SnodeConfig
();
SnodeConfig
snodeConfig
=
new
SnodeConfig
();
final
ServerConfig
nettyServerConfig
=
new
ServerConfig
();
final
ClientConfig
nettyClientConfig
=
new
ClientConfig
();
nettyServerConfig
.
setListenPort
(
snodeConfig
.
getListenPort
());
nettyServerConfig
.
setListenPort
(
11911
);
nettyClientConfig
.
setUseTLS
(
Boolean
.
parseBoolean
(
System
.
getProperty
(
TLS_ENABLE
,
String
.
valueOf
(
TlsSystemConfig
.
tlsMode
==
TlsMode
.
ENFORCING
))));
...
...
@@ -101,27 +105,28 @@ public class SnodeStartup {
MixAll
.
properties2Object
(
properties
,
snodeConfig
);
MixAll
.
properties2Object
(
properties
,
nettyServerConfig
);
MixAll
.
properties2Object
(
properties
,
nettyClientConfig
);
in
.
close
();
}
}
snodeConfig
.
setNettyServerConfig
(
nettyServerConfig
);
snodeConfig
.
setNettyClientConfig
(
nettyClientConfig
);
if
(
null
==
snodeConfig
.
getRocketmqHome
())
{
System
.
out
.
printf
(
"Please set the %s variable in your environment to match the location of the RocketMQ installation"
,
MixAll
.
ROCKETMQ_HOME_ENV
);
System
.
exit
(-
2
);
}
LoggerContext
lc
=
(
LoggerContext
)
LoggerFactory
.
getILoggerFactory
();
JoranConfigurator
configurator
=
new
JoranConfigurator
();
configurator
.
setContext
(
lc
);
lc
.
reset
();
configurator
.
doConfigure
(
snodeConfig
.
getRocketmqHome
()
+
"/conf/logback_snode.xml"
);
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
SNODE_LOGGER_NAME
);
MixAll
.
printObjectProperties
(
log
,
snodeConfig
);
MixAll
.
printObjectProperties
(
log
,
nettyClientConfig
);
MixAll
.
printObjectProperties
(
log
,
nettyServerConfig
);
MixAll
.
printObjectProperties
(
log
,
snodeConfig
.
getNettyServerConfig
());
MixAll
.
printObjectProperties
(
log
,
snodeConfig
.
getNettyClientConfig
());
return
snodeConfig
;
}
public
static
SnodeController
createSnodeController
(
SnodeConfig
snodeConfig
)
throws
JoranException
{
final
SnodeController
snodeController
=
new
SnodeController
(
nettyServerConfig
,
nettyClientConfig
,
snodeConfig
.
getNettyServerConfig
()
,
snodeConfig
.
getNettyClientConfig
()
,
snodeConfig
);
boolean
initResult
=
snodeController
.
initialize
();
...
...
@@ -148,7 +153,14 @@ public class SnodeStartup {
}
}
}
},
"ShutdownHook"
));
},
"ShutdownHook"
));
LoggerContext
lc
=
(
LoggerContext
)
LoggerFactory
.
getILoggerFactory
();
JoranConfigurator
configurator
=
new
JoranConfigurator
();
configurator
.
setContext
(
lc
);
lc
.
reset
();
configurator
.
doConfigure
(
snodeConfig
.
getRocketmqHome
()
+
"/conf/logback_snode.xml"
);
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
SNODE_LOGGER_NAME
);
return
snodeController
;
}
...
...
@@ -164,7 +176,7 @@ public class SnodeStartup {
opt
=
new
Option
(
"m"
,
"printImportantConfig"
,
false
,
"Print important config item"
);
opt
.
setRequired
(
false
);
options
.
addOption
(
opt
);
return
options
;
}
}
...
...
snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionGroupManager.java
浏览文件 @
14a75e5c
...
...
@@ -42,19 +42,6 @@ public class SubscriptionGroupManager {
private
void
init
()
{
}
public
void
updateSubscriptionGroupConfig
(
final
SubscriptionGroupConfig
config
)
{
SubscriptionGroupConfig
old
=
this
.
subscriptionGroupTable
.
put
(
config
.
getGroupName
(),
config
);
if
(
old
!=
null
)
{
log
.
info
(
"Update subscription group config, old: {} new: {}"
,
old
,
config
);
}
else
{
log
.
info
(
"Create new subscription group, {}"
,
config
);
}
this
.
dataVersion
.
nextVersion
();
this
.
persistSubscription
(
config
);
}
public
void
disableConsume
(
final
String
groupName
)
{
SubscriptionGroupConfig
old
=
this
.
subscriptionGroupTable
.
get
(
groupName
);
if
(
old
!=
null
)
{
...
...
@@ -63,7 +50,8 @@ public class SubscriptionGroupManager {
}
}
public
SubscriptionGroupConfig
findSubscriptionGroupConfig
(
final
String
group
)
{
public
SubscriptionGroupConfig
findSubscriptionGroupConfig
(
final
String
group
)
{
SubscriptionGroupConfig
subscriptionGroupConfig
=
this
.
subscriptionGroupTable
.
get
(
group
);
if
(
null
==
subscriptionGroupConfig
)
{
if
(
snodeController
.
getSnodeConfig
().
isAutoCreateSubscriptionGroup
()
||
MixAll
.
isSysConsumerGroup
(
group
))
{
...
...
@@ -74,15 +62,13 @@ public class SubscriptionGroupManager {
log
.
info
(
"Auto create a subscription group, {}"
,
subscriptionGroupConfig
.
toString
());
}
this
.
dataVersion
.
nextVersion
();
this
.
persistSubscription
(
subscriptionGroupConfig
);
this
.
snodeController
.
getEnodeService
().
persistSubscriptionGroupConfig
(
subscriptionGroupConfig
);
}
}
return
subscriptionGroupConfig
;
}
public
ConcurrentMap
<
String
,
SubscriptionGroupConfig
>
getSubscriptionGroupTable
()
{
return
subscriptionGroupTable
;
}
...
...
@@ -91,18 +77,4 @@ public class SubscriptionGroupManager {
return
dataVersion
;
}
public
void
deleteSubscriptionGroupConfig
(
final
String
groupName
)
{
SubscriptionGroupConfig
old
=
this
.
subscriptionGroupTable
.
remove
(
groupName
);
if
(
old
!=
null
)
{
log
.
info
(
"delete subscription group OK, subscription group:{}"
,
old
);
this
.
dataVersion
.
nextVersion
();
this
.
persistSubscription
(
old
);
}
else
{
log
.
warn
(
"delete subscription group failed, subscription groupName: {} not exist"
,
groupName
);
}
}
void
persistSubscription
(
SubscriptionGroupConfig
config
)
{
this
.
snodeController
.
getEnodeService
().
persistSubscriptionGroupConfig
(
config
);
}
}
snode/src/main/java/org/apache/rocketmq/snode/client/SubscriptionManager.java
浏览文件 @
14a75e5c
...
...
@@ -29,11 +29,6 @@ public interface SubscriptionManager {
boolean
subscribe
(
String
groupId
,
Set
<
SubscriptionData
>
subscriptionDataSet
,
ConsumeType
consumeType
,
MessageModel
messageModel
,
ConsumeFromWhere
consumeFromWhere
);
void
unSubscribe
(
String
groupId
,
RemotingChannel
remotingChannel
,
Set
<
SubscriptionData
>
subscriptionDataSet
);
void
cleanSubscription
(
String
groupId
,
String
topic
);
Subscription
getSubscription
(
String
groupId
);
void
registerPushSession
(
Set
<
SubscriptionData
>
subscriptionDataSet
,
RemotingChannel
remotingChannel
,
...
...
snode/src/main/java/org/apache/rocketmq/snode/client/impl/SubscriptionManagerImpl.java
浏览文件 @
14a75e5c
...
...
@@ -44,7 +44,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
@Override
public
void
registerPushSession
(
Set
<
SubscriptionData
>
subscriptionDataSet
,
RemotingChannel
remotingChannel
,
String
groupId
)
{
log
.
debug
(
"Before ConsumerGroup: {} RemotingChannel: {} subscription: {}"
,
groupId
,
remotingChannel
.
remoteAddress
(),
subscriptionDataSet
);
log
.
info
(
"Before ConsumerGroup: {} RemotingChannel: {} subscription: {}"
,
groupId
,
remotingChannel
.
remoteAddress
(),
subscriptionDataSet
);
Set
<
MessageQueue
>
prevSubSet
=
this
.
clientSubscriptionTable
.
get
(
remotingChannel
);
Set
<
MessageQueue
>
keySet
=
new
HashSet
<>();
for
(
SubscriptionData
subscriptionData
:
subscriptionDataSet
)
{
...
...
@@ -77,7 +77,7 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
}
}
}
log
.
debug
(
"After ConsumerGroup: {} RemotingChannel: {} subscription: {}"
,
groupId
,
remotingChannel
.
remoteAddress
(),
this
.
clientSubscriptionTable
.
get
(
remotingChannel
));
log
.
info
(
"After ConsumerGroup: {} RemotingChannel: {} subscription: {}"
,
groupId
,
remotingChannel
.
remoteAddress
(),
this
.
clientSubscriptionTable
.
get
(
remotingChannel
));
}
@Override
...
...
@@ -191,17 +191,6 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
return
updated
;
}
@Override
public
void
unSubscribe
(
String
groupId
,
RemotingChannel
remotingChannel
,
Set
<
SubscriptionData
>
subscriptionDataSet
)
{
}
@Override
public
void
cleanSubscription
(
String
groupId
,
String
topic
)
{
}
@Override
public
Subscription
getSubscription
(
String
groupId
)
{
return
groupSubscriptionTable
.
get
(
groupId
);
...
...
snode/src/main/java/org/apache/rocketmq/snode/offset/ConsumerOffsetManager.java
浏览文件 @
14a75e5c
...
...
@@ -20,13 +20,9 @@ import java.util.concurrent.ConcurrentHashMap;
import
java.util.concurrent.ConcurrentMap
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.exception.RemotingConnectException
;
import
org.apache.rocketmq.remoting.exception.RemotingSendRequestException
;
import
org.apache.rocketmq.remoting.exception.RemotingTimeoutException
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.remoting.RemotingChannel
;
import
org.apache.rocketmq.snode.SnodeController
;
import
org.apache.rocketmq.snode.exception.SnodeException
;
...
...
@@ -79,17 +75,18 @@ public class ConsumerOffsetManager {
}
}
private
long
parserOffset
(
final
String
enodeName
,
final
String
group
,
final
String
topic
,
final
int
queueId
)
{
try
{
RemotingCommand
remotingCommand
=
queryOffset
(
enodeName
,
group
,
topic
,
queueId
);
QueryConsumerOffsetResponseHeader
responseHeader
=
(
QueryConsumerOffsetResponseHeader
)
remotingCommand
.
decodeCommandCustomHeader
(
QueryConsumerOffsetResponseHeader
.
class
);
return
responseHeader
.
getOffset
();
}
catch
(
Exception
ex
)
{
log
.
error
(
"Load offset from broker error"
,
ex
);
}
return
-
1
;
}
// private long parserOffset(final RemotingChannel remotingChannel, final String enodeName, final String group,
// final String topic, final int queueId) {
// try {
// RemotingCommand remotingCommand = queryOffset(remotingChannel, enodeName, group, topic, queueId);
// QueryConsumerOffsetResponseHeader responseHeader =
// (QueryConsumerOffsetResponseHeader) remotingCommand.decodeCommandCustomHeader(QueryConsumerOffsetResponseHeader.class);
// return responseHeader.getOffset();
// } catch (Exception ex) {
// log.error("Load offset from broker error", ex);
// }
// return -1;
// }
public
long
queryCacheOffset
(
final
String
enodeName
,
final
String
group
,
final
String
topic
,
final
int
queueId
)
{
String
key
=
buildKey
(
enodeName
,
topic
,
group
);
...
...
@@ -99,30 +96,33 @@ public class ConsumerOffsetManager {
map
=
this
.
offsetTable
.
putIfAbsent
(
key
,
map
);
}
CacheOffset
cacheOffset
=
map
.
get
(
queueId
);
if
(
cacheOffset
!=
null
)
{
if
(
System
.
currentTimeMillis
()
-
cacheOffset
.
getUpdateTimestamp
()
>
snodeController
.
getSnodeConfig
().
getLoadOffsetInterval
())
{
cacheOffset
.
setOffset
(
parserOffset
(
enodeName
,
group
,
topic
,
queueId
));
cacheOffset
.
setUpdateTimestamp
(
System
.
currentTimeMillis
());
try
{
if
(
cacheOffset
!=
null
)
{
if
(!
this
.
snodeController
.
getSnodeConfig
().
isEmbeddedModeEnable
()
&&
System
.
currentTimeMillis
()
-
cacheOffset
.
getUpdateTimestamp
()
>
snodeController
.
getSnodeConfig
().
getLoadOffsetInterval
())
{
long
offset
=
this
.
snodeController
.
getEnodeService
().
queryOffset
(
enodeName
,
group
,
topic
,
queueId
);
cacheOffset
.
setOffset
(
offset
);
cacheOffset
.
setUpdateTimestamp
(
System
.
currentTimeMillis
());
}
else
{
long
offset
=
this
.
snodeController
.
getEnodeService
().
queryOffset
(
enodeName
,
group
,
topic
,
queueId
);
cacheOffset
.
setOffset
(
offset
);
}
}
else
{
long
offset
=
this
.
snodeController
.
getEnodeService
().
queryOffset
(
enodeName
,
group
,
topic
,
queueId
);
cacheOffset
=
new
CacheOffset
(
key
,
offset
,
System
.
currentTimeMillis
());
map
.
put
(
queueId
,
cacheOffset
);
}
}
else
{
cacheOffset
=
new
CacheOffset
(
key
,
parserOffset
(
enodeName
,
group
,
topic
,
queueId
),
System
.
currentTimeMillis
());
map
.
put
(
queueId
,
cacheOffset
);
}
catch
(
Exception
ex
)
{
log
.
warn
(
"Load offset error, enodeName: {}, group:{},topic:{} queueId:{}"
,
enodeName
,
group
,
topic
,
queueId
);
}
return
cacheOffset
.
getOffset
();
}
public
void
commitOffset
(
final
String
enodeName
,
final
String
clientHost
,
final
String
group
,
final
String
topic
,
public
void
commitOffset
(
final
RemotingChannel
remotingChannel
,
final
String
enodeName
,
final
String
clientHost
,
final
String
group
,
final
String
topic
,
final
int
queueId
,
final
long
offset
)
{
cacheOffset
(
enodeName
,
clientHost
,
group
,
topic
,
queueId
,
offset
);
this
.
snodeController
.
getEnodeService
().
persistOffset
(
enodeName
,
group
,
topic
,
queueId
,
offset
);
}
public
RemotingCommand
queryOffset
(
final
String
enodeName
,
final
String
group
,
final
String
topic
,
final
int
queueId
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
{
return
this
.
snodeController
.
getEnodeService
().
loadOffset
(
enodeName
,
group
,
topic
,
queueId
);
this
.
snodeController
.
getEnodeService
().
persistOffset
(
remotingChannel
,
enodeName
,
group
,
topic
,
queueId
,
offset
);
}
public
class
CacheOffset
{
...
...
snode/src/main/java/org/apache/rocketmq/snode/processor/ConsumerManageProcessor.java
浏览文件 @
14a75e5c
...
...
@@ -25,9 +25,13 @@ import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestH
import
org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody
;
import
org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader
;
import
org.apache.rocketmq.logging.InternalLogger
;
...
...
@@ -87,7 +91,14 @@ public class ConsumerManageProcessor implements RequestProcessor {
(
SearchOffsetRequestHeader
)
request
.
decodeCommandCustomHeader
(
SearchOffsetRequestHeader
.
class
);
try
{
return
this
.
snodeController
.
getEnodeService
().
getOffsetByTimestamp
(
requestHeader
.
getEnodeName
(),
request
);
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
SearchOffsetResponseHeader
.
class
);
final
SearchOffsetResponseHeader
responseHeader
=
(
SearchOffsetResponseHeader
)
response
.
readCustomHeader
();
long
offset
=
this
.
snodeController
.
getEnodeService
().
getOffsetByTimestamp
(
requestHeader
.
getEnodeName
(),
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
(),
requestHeader
.
getTimestamp
(),
request
);
responseHeader
.
setOffset
(
offset
);
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setRemark
(
null
);
}
catch
(
Exception
ex
)
{
log
.
error
(
"Search offset by timestamp error:{}"
,
ex
);
}
...
...
@@ -100,7 +111,14 @@ public class ConsumerManageProcessor implements RequestProcessor {
(
GetMinOffsetRequestHeader
)
request
.
decodeCommandCustomHeader
(
GetMinOffsetRequestHeader
.
class
);
try
{
return
this
.
snodeController
.
getEnodeService
().
getMinOffsetInQueue
(
requestHeader
.
getEnodeName
(),
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
GetMinOffsetResponseHeader
.
class
);
final
GetMinOffsetResponseHeader
responseHeader
=
(
GetMinOffsetResponseHeader
)
response
.
readCustomHeader
();
long
offset
=
this
.
snodeController
.
getEnodeService
().
getMinOffsetInQueue
(
requestHeader
.
getEnodeName
(),
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
(),
request
);
responseHeader
.
setOffset
(
offset
);
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setRemark
(
null
);
}
catch
(
Exception
ex
)
{
log
.
error
(
"Get min offset error:{}"
,
ex
);
}
...
...
@@ -113,9 +131,16 @@ public class ConsumerManageProcessor implements RequestProcessor {
(
GetMaxOffsetRequestHeader
)
request
.
decodeCommandCustomHeader
(
GetMaxOffsetRequestHeader
.
class
);
try
{
return
this
.
snodeController
.
getEnodeService
().
getMaxOffsetInQueue
(
requestHeader
.
getEnodeName
(),
request
);
long
offset
=
this
.
snodeController
.
getEnodeService
().
getMaxOffsetInQueue
(
requestHeader
.
getEnodeName
(),
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
(),
request
);
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
GetMaxOffsetResponseHeader
.
class
);
final
GetMaxOffsetResponseHeader
responseHeader
=
(
GetMaxOffsetResponseHeader
)
response
.
readCustomHeader
();
responseHeader
.
setOffset
(
offset
);
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setRemark
(
null
);
return
response
;
}
catch
(
Exception
ex
)
{
log
.
error
(
"Get min offset error
:{}"
,
ex
);
log
.
error
(
"Get min offset error
, remoting: {} error: {} "
,
remotingChannel
.
remoteAddress
()
,
ex
);
}
return
null
;
}
...
...
@@ -153,7 +178,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
final
UpdateConsumerOffsetRequestHeader
requestHeader
=
(
UpdateConsumerOffsetRequestHeader
)
request
.
decodeCommandCustomHeader
(
UpdateConsumerOffsetRequestHeader
.
class
);
this
.
snodeController
.
getConsumerOffsetManager
().
commitOffset
(
requestHeader
.
getEnodeName
(),
RemotingHelper
.
parseChannelRemoteAddr
(
remotingChannel
.
remoteAddress
()),
requestHeader
.
getConsumerGroup
(),
this
.
snodeController
.
getConsumerOffsetManager
().
commitOffset
(
re
motingChannel
,
re
questHeader
.
getEnodeName
(),
RemotingHelper
.
parseChannelRemoteAddr
(
remotingChannel
.
remoteAddress
()),
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
(),
requestHeader
.
getCommitOffset
());
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setRemark
(
null
);
...
...
@@ -171,9 +196,15 @@ public class ConsumerManageProcessor implements RequestProcessor {
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
return
this
.
snodeController
.
getConsumerOffsetManager
().
queryOffset
(
requestHeader
.
getEnodeName
(),
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
(),
long
offset
=
this
.
snodeController
.
getEnodeService
().
queryOffset
(
requestHeader
.
getEnodeName
(),
requestHeader
.
getConsumerGroup
(),
requestHeader
.
getTopic
(),
requestHeader
.
getQueueId
());
final
RemotingCommand
response
=
RemotingCommand
.
createResponseCommand
(
QueryConsumerOffsetResponseHeader
.
class
);
final
QueryConsumerOffsetResponseHeader
responseHeader
=
(
QueryConsumerOffsetResponseHeader
)
response
.
readCustomHeader
();
responseHeader
.
setOffset
(
offset
);
response
.
setCode
(
ResponseCode
.
SUCCESS
);
response
.
setRemark
(
null
);
return
response
;
}
public
RemotingCommand
createRetryTopic
(
RemotingChannel
remotingChannel
,
...
...
@@ -181,7 +212,7 @@ public class ConsumerManageProcessor implements RequestProcessor {
RemotingSendRequestException
,
RemotingConnectException
,
RemotingCommandException
{
final
CreateRetryTopicRequestHeader
requestHeader
=
(
CreateRetryTopicRequestHeader
)
request
.
decodeCommandCustomHeader
(
CreateRetryTopicRequestHeader
.
class
);
requestHeader
.
getEnodeName
();
return
this
.
snodeController
.
getEnodeService
().
creatRetryTopic
(
requestHeader
.
getEnodeName
(),
request
);
return
this
.
snodeController
.
getEnodeService
().
creatRetryTopic
(
re
motingChannel
,
re
questHeader
.
getEnodeName
(),
request
);
}
}
snode/src/main/java/org/apache/rocketmq/snode/processor/PullMessageProcessor.java
浏览文件 @
14a75e5c
...
...
@@ -127,14 +127,16 @@ public class PullMessageProcessor implements RequestProcessor {
}
}
CompletableFuture
<
RemotingCommand
>
responseFuture
=
snodeController
.
getEnodeService
().
pullMessage
(
requestHeader
.
getEnodeName
(),
request
);
CompletableFuture
<
RemotingCommand
>
responseFuture
=
snodeController
.
getEnodeService
().
pullMessage
(
re
motingChannel
,
re
questHeader
.
getEnodeName
(),
request
);
responseFuture
.
whenComplete
((
data
,
ex
)
->
{
if
(
ex
==
null
)
{
if
(
this
.
snodeController
.
getConsumeMessageInterceptorGroup
()
!=
null
)
{
ResponseContext
responseContext
=
new
ResponseContext
(
request
,
remotingChannel
,
data
);
this
.
snodeController
.
getSendMessageInterceptorGroup
().
afterRequest
(
responseContext
);
}
remotingChannel
.
reply
(
data
);
if
(
data
!=
null
)
{
remotingChannel
.
reply
(
data
);
}
}
else
{
if
(
this
.
snodeController
.
getConsumeMessageInterceptorGroup
()
!=
null
)
{
ExceptionContext
exceptionContext
=
new
ExceptionContext
(
request
,
remotingChannel
,
ex
,
null
);
...
...
snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
浏览文件 @
14a75e5c
...
...
@@ -83,11 +83,11 @@ public class SendMessageProcessor implements RequestProcessor {
stringBuffer
.
append
(
MixAll
.
getRetryTopic
(
consumerSendMsgBackRequestHeader
.
getGroup
()));
}
CompletableFuture
<
RemotingCommand
>
responseFuture
=
snodeController
.
getEnodeService
().
sendMessage
(
enodeName
,
request
);
CompletableFuture
<
RemotingCommand
>
responseFuture
=
snodeController
.
getEnodeService
().
sendMessage
(
remotingChannel
,
enodeName
,
request
);
sendMessageRequestHeaderV2
.
setO
(
remotingChannel
.
remoteAddress
());
final
byte
[]
message
=
request
.
getBody
();
final
boolean
isN
eedPush
=
!
isSendBack
;
final
boolean
n
eedPush
=
!
isSendBack
;
final
SendMessageRequestHeader
sendMessageRequestHeader
=
SendMessageRequestHeaderV2
.
createSendMessageRequestHeaderV1
(
sendMessageRequestHeaderV2
);
responseFuture
.
whenComplete
((
data
,
ex
)
->
{
...
...
@@ -98,7 +98,7 @@ public class SendMessageProcessor implements RequestProcessor {
}
remotingChannel
.
reply
(
data
);
this
.
snodeController
.
getMetricsService
().
recordRequestSize
(
stringBuffer
.
toString
(),
request
.
getBody
().
length
);
if
(
data
.
getCode
()
==
ResponseCode
.
SUCCESS
&&
isN
eedPush
)
{
if
(
data
.
getCode
()
==
ResponseCode
.
SUCCESS
&&
n
eedPush
)
{
this
.
snodeController
.
getPushService
().
pushMessage
(
sendMessageRequestHeader
,
message
,
data
);
}
}
else
{
...
...
snode/src/main/java/org/apache/rocketmq/snode/processor/mqtthandler/MqttPingreqMessageHandler.java
浏览文件 @
14a75e5c
...
...
@@ -29,6 +29,7 @@ public class MqttPingreqMessageHandler implements MessageHandler {
public
MqttPingreqMessageHandler
(
SnodeController
snodeController
)
{
this
.
snodeController
=
snodeController
;
}
/**
* handle the PINGREQ message from client
* <ol>
...
...
@@ -41,7 +42,8 @@ public class MqttPingreqMessageHandler implements MessageHandler {
* @param message
* @return
*/
@Override
public
RemotingCommand
handleMessage
(
MqttMessage
message
,
RemotingChannel
remotingChannel
)
{
@Override
public
RemotingCommand
handleMessage
(
MqttMessage
message
,
RemotingChannel
remotingChannel
)
{
return
null
;
}
}
snode/src/main/java/org/apache/rocketmq/snode/service/AdminService.java
浏览文件 @
14a75e5c
package
org.apache.rocketmq.snode.service
;
/*
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
...
...
@@ -14,7 +14,7 @@ package org.apache.rocketmq.snode.service;/*
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.snode.service
;
public
interface
AdminService
{
}
snode/src/main/java/org/apache/rocketmq/snode/service/EnodeService.java
浏览文件 @
14a75e5c
...
...
@@ -21,6 +21,7 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
import
org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2
;
import
org.apache.rocketmq.common.subscription.SubscriptionGroupConfig
;
import
org.apache.rocketmq.remoting.RemotingChannel
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.exception.RemotingConnectException
;
import
org.apache.rocketmq.remoting.exception.RemotingSendRequestException
;
...
...
@@ -42,7 +43,8 @@ public interface EnodeService {
* @param request {@link SendMessageRequestHeaderV2} Send message request header
* @return Send message response future
*/
CompletableFuture
<
RemotingCommand
>
sendMessage
(
final
String
enodeName
,
final
RemotingCommand
request
);
CompletableFuture
<
RemotingCommand
>
sendMessage
(
final
RemotingChannel
remotingChannel
,
final
String
enodeName
,
final
RemotingCommand
request
);
/**
* Pull message from enode server.
...
...
@@ -51,7 +53,8 @@ public interface EnodeService {
* @param request {@link PullMessageRequestHeader} Pull message request header
* @return Pull message Response future
*/
CompletableFuture
<
RemotingCommand
>
pullMessage
(
final
String
enodeName
,
final
RemotingCommand
request
);
CompletableFuture
<
RemotingCommand
>
pullMessage
(
final
RemotingChannel
remotingChannel
,
final
String
enodeName
,
final
RemotingCommand
request
);
/**
* Create retry topic in enode server.
...
...
@@ -64,7 +67,7 @@ public interface EnodeService {
* @throws RemotingSendRequestException
* @throws RemotingConnectException
*/
RemotingCommand
creatRetryTopic
(
String
enodeName
,
RemotingCommand
creatRetryTopic
(
final
RemotingChannel
remotingChannel
,
String
enodeName
,
RemotingCommand
request
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
;
/**
...
...
@@ -98,20 +101,24 @@ public interface EnodeService {
* @param queueId QueueId of related topic.
* @param offset Current offset of target queue of subscribed topic.
*/
void
persistOffset
(
String
enodeName
,
String
groupName
,
String
topic
,
int
queueId
,
long
offset
);
void
persistOffset
(
final
RemotingChannel
remotingChannel
,
String
enodeName
,
String
groupName
,
String
topic
,
int
queueId
,
long
offset
);
RemotingCommand
loadOffset
(
String
enodeName
,
String
consumerGroup
,
String
topic
,
int
queueId
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
;
long
queryOffset
(
String
enodeName
,
String
consumerGroup
,
String
topic
,
int
queueId
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
RemotingCommandException
;
RemotingCommand
getMaxOffsetInQueue
(
String
enodeName
,
RemotingCommand
request
)
throws
InterruptedException
,
RemotingTimeoutException
,
long
getMaxOffsetInQueue
(
String
enodeName
,
String
topic
,
int
queueId
,
RemotingCommand
request
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
RemotingCommandException
;
RemotingCommand
getMinOffsetInQueue
(
String
enodeName
,
String
topic
,
int
queueId
)
throws
InterruptedException
,
RemotingTimeoutException
,
long
getMinOffsetInQueue
(
String
enodeName
,
String
topic
,
int
queueId
,
RemotingCommand
request
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
RemotingCommandException
;
RemotingCommand
getOffsetByTimestamp
(
String
enodeName
,
RemotingCommand
request
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
;
long
getOffsetByTimestamp
(
String
enodeName
,
String
topic
,
int
queueId
,
long
timestamp
,
RemotingCommand
request
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
RemotingCommandException
;
}
snode/src/main/java/org/apache/rocketmq/snode/service/impl/ClientServiceImpl.java
浏览文件 @
14a75e5c
...
...
@@ -16,6 +16,7 @@
*/
package
org.apache.rocketmq.snode.service.impl
;
import
java.util.List
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
...
...
snode/src/main/java/org/apache/rocketmq/snode/service/impl/LocalEnodeServiceImpl.java
0 → 100644
浏览文件 @
14a75e5c
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.snode.service.impl
;
import
java.util.concurrent.CompletableFuture
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.common.constant.LoggerName
;
import
org.apache.rocketmq.common.subscription.SubscriptionGroupConfig
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.RemotingChannel
;
import
org.apache.rocketmq.remoting.netty.CodecHelper
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.snode.service.EnodeService
;
public
class
LocalEnodeServiceImpl
implements
EnodeService
{
private
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
SNODE_LOGGER_NAME
);
private
BrokerController
brokerController
;
public
LocalEnodeServiceImpl
(
BrokerController
brokerController
)
{
this
.
brokerController
=
brokerController
;
}
@Override
public
void
sendHeartbeat
(
RemotingCommand
remotingCommand
)
{
return
;
}
@Override
public
CompletableFuture
<
RemotingCommand
>
sendMessage
(
final
RemotingChannel
remotingChannel
,
String
enodeName
,
RemotingCommand
request
)
{
CompletableFuture
<
RemotingCommand
>
completableFuture
=
new
CompletableFuture
<>();
try
{
log
.
debug
(
"Send message request:{}"
,
request
);
RemotingCommand
remotingCommand
=
this
.
brokerController
.
getSendProcessor
().
processRequest
(
remotingChannel
,
request
);
CodecHelper
.
encodeHeader
(
remotingCommand
);
completableFuture
.
complete
(
remotingCommand
);
}
catch
(
Exception
ex
)
{
log
.
error
(
"[Local]Request local enode send message error"
,
ex
);
completableFuture
.
completeExceptionally
(
ex
);
}
return
completableFuture
;
}
@Override
public
CompletableFuture
<
RemotingCommand
>
pullMessage
(
RemotingChannel
remotingChannel
,
String
enodeName
,
RemotingCommand
request
)
{
CompletableFuture
<
RemotingCommand
>
completableFuture
=
new
CompletableFuture
<>();
try
{
RemotingCommand
response
=
this
.
brokerController
.
getSnodePullMessageProcessor
().
processRequest
(
remotingChannel
,
request
);
completableFuture
.
complete
(
response
);
}
catch
(
Exception
ex
)
{
log
.
error
(
"[Local]Request local enode pull message error"
,
ex
);
completableFuture
.
completeExceptionally
(
ex
);
}
return
completableFuture
;
}
@Override
public
RemotingCommand
creatRetryTopic
(
RemotingChannel
remotingChannel
,
String
enodeName
,
RemotingCommand
request
)
{
try
{
return
this
.
brokerController
.
getClientManageProcessor
().
processRequest
(
remotingChannel
,
request
);
}
catch
(
Exception
ex
)
{
log
.
error
(
"[Local]Request create retry topic error"
,
ex
);
}
return
null
;
}
@Override
public
void
updateEnodeAddress
(
String
clusterName
)
{
}
@Override
public
boolean
persistSubscriptionGroupConfig
(
final
SubscriptionGroupConfig
subscriptionGroupConfig
)
{
boolean
persist
=
false
;
if
(
subscriptionGroupConfig
!=
null
)
{
this
.
brokerController
.
getSubscriptionGroupManager
().
updateSubscriptionGroupConfig
(
subscriptionGroupConfig
);
persist
=
true
;
}
return
persist
;
}
@Override
public
void
persistOffset
(
RemotingChannel
remotingChannel
,
String
enodeName
,
String
groupName
,
String
topic
,
int
queueId
,
long
offset
)
{
try
{
// UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
// requestHeader.setConsumerGroup(groupName);
// requestHeader.setTopic(topic);
// requestHeader.setQueueId(queueId);
// requestHeader.setCommitOffset(offset);
// RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);
// this.brokerController.getConsumerManageProcessor().processRequest(remotingChannel, request);
this
.
brokerController
.
getConsumerOffsetManager
().
commitOffset
(
remotingChannel
.
remoteAddress
().
toString
(),
groupName
,
topic
,
queueId
,
offset
);
}
catch
(
Exception
ex
)
{
log
.
error
(
"[Local]Persist offset to Enode error group: [{}], topic: [{}] queue: [{}]!"
,
ex
,
groupName
,
topic
,
queueId
);
}
}
@Override
public
long
queryOffset
(
String
enodeName
,
String
consumerGroup
,
String
topic
,
int
queueId
)
{
return
this
.
brokerController
.
getConsumerOffsetManager
().
queryOffset
(
consumerGroup
,
topic
,
queueId
);
}
@Override
public
long
getMaxOffsetInQueue
(
String
enodeName
,
String
topic
,
int
queueId
,
RemotingCommand
request
)
{
return
this
.
brokerController
.
getMessageStore
().
getMaxOffsetInQueue
(
topic
,
queueId
);
}
@Override
public
long
getMinOffsetInQueue
(
String
enodeName
,
String
topic
,
int
queueId
,
RemotingCommand
request
)
{
return
this
.
brokerController
.
getMessageStore
().
getMinOffsetInQueue
(
topic
,
queueId
);
}
@Override
public
long
getOffsetByTimestamp
(
String
enodeName
,
String
topic
,
int
queueId
,
long
timestamp
,
RemotingCommand
request
)
{
return
this
.
brokerController
.
getMessageStore
().
getOffsetInQueueByTime
(
topic
,
queueId
,
timestamp
);
}
}
snode/src/main/java/org/apache/rocketmq/snode/service/impl/PushServiceImpl.java
浏览文件 @
14a75e5c
...
...
@@ -95,7 +95,7 @@ public class PushServiceImpl implements PushService {
messageExt
.
setFlag
(
sendMessageRequestHeader
.
getFlag
());
messageExt
.
setBody
(
message
);
messageExt
.
setBodyCRC
(
UtilAll
.
crc32
(
message
));
log
.
debug
(
"MessageExt:{}"
,
messageExt
);
log
.
info
(
"MessageExt:{}"
,
messageExt
);
return
messageExt
;
}
...
...
@@ -103,7 +103,6 @@ public class PushServiceImpl implements PushService {
public
void
run
()
{
if
(!
canceled
.
get
())
{
try
{
log
.
debug
(
"sendMessageResponse: {}"
,
sendMessageResponse
);
SendMessageResponseHeader
sendMessageResponseHeader
=
(
SendMessageResponseHeader
)
sendMessageResponse
.
decodeCommandCustomHeader
(
SendMessageResponseHeader
.
class
);
log
.
debug
(
"sendMessageResponseHeader: {}"
,
sendMessageResponseHeader
);
MessageQueue
messageQueue
=
new
MessageQueue
(
sendMessageRequestHeader
.
getTopic
(),
sendMessageRequestHeader
.
getEnodeName
(),
sendMessageRequestHeader
.
getQueueId
());
...
...
@@ -118,6 +117,7 @@ public class PushServiceImpl implements PushService {
MessageExt
messageExt
=
buildMessageExt
(
sendMessageResponseHeader
,
message
,
sendMessageRequestHeader
);
pushMessage
.
setBody
(
MessageDecoder
.
encode
(
messageExt
,
false
));
for
(
RemotingChannel
remotingChannel
:
consumerTable
)
{
log
.
info
(
"Push message pushMessage:{} to remotingChannel: {}"
,
pushMessage
,
remotingChannel
);
Client
client
=
null
;
if
(
remotingChannel
instanceof
NettyChannelImpl
)
{
Channel
channel
=
((
NettyChannelImpl
)
remotingChannel
).
getChannel
();
...
...
snode/src/main/java/org/apache/rocketmq/snode/service/impl/EnodeServiceImpl.java
→
snode/src/main/java/org/apache/rocketmq/snode/service/impl/
Remote
EnodeServiceImpl.java
浏览文件 @
14a75e5c
...
...
@@ -28,13 +28,17 @@ import org.apache.rocketmq.common.constant.LoggerName;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.body.ClusterInfo
;
import
org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader
;
import
org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader
;
import
org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader
;
import
org.apache.rocketmq.common.subscription.SubscriptionGroupConfig
;
import
org.apache.rocketmq.logging.InternalLogger
;
import
org.apache.rocketmq.logging.InternalLoggerFactory
;
import
org.apache.rocketmq.remoting.InvokeCallback
;
import
org.apache.rocketmq.remoting.RemotingChannel
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.exception.RemotingConnectException
;
import
org.apache.rocketmq.remoting.exception.RemotingSendRequestException
;
...
...
@@ -46,7 +50,7 @@ import org.apache.rocketmq.snode.SnodeController;
import
org.apache.rocketmq.snode.constant.SnodeConstant
;
import
org.apache.rocketmq.snode.service.EnodeService
;
public
class
EnodeServiceImpl
implements
EnodeService
{
public
class
Remote
EnodeServiceImpl
implements
EnodeService
{
private
static
final
InternalLogger
log
=
InternalLoggerFactory
.
getLogger
(
LoggerName
.
SNODE_LOGGER_NAME
);
private
SnodeController
snodeController
;
...
...
@@ -54,7 +58,7 @@ public class EnodeServiceImpl implements EnodeService {
private
final
ConcurrentMap
<
String
/* Broker Name */
,
HashMap
<
Long
/* brokerId */
,
String
/* address */
>>
enodeTable
=
new
ConcurrentHashMap
<>();
public
EnodeServiceImpl
(
SnodeController
snodeController
)
{
public
Remote
EnodeServiceImpl
(
SnodeController
snodeController
)
{
this
.
snodeController
=
snodeController
;
}
...
...
@@ -73,7 +77,8 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public
CompletableFuture
<
RemotingCommand
>
pullMessage
(
final
String
enodeName
,
final
RemotingCommand
request
)
{
public
CompletableFuture
<
RemotingCommand
>
pullMessage
(
final
RemotingChannel
remotingChannel
,
final
String
enodeName
,
final
RemotingCommand
request
)
{
CompletableFuture
<
RemotingCommand
>
future
=
new
CompletableFuture
<>();
try
{
...
...
@@ -103,7 +108,8 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public
CompletableFuture
<
RemotingCommand
>
sendMessage
(
String
enodeName
,
RemotingCommand
request
)
{
public
CompletableFuture
<
RemotingCommand
>
sendMessage
(
final
RemotingChannel
remotingChannel
,
String
enodeName
,
RemotingCommand
request
)
{
CompletableFuture
<
RemotingCommand
>
future
=
new
CompletableFuture
<>();
try
{
String
enodeAddress
=
this
.
snodeController
.
getNnodeService
().
getAddressByEnodeName
(
enodeName
,
false
);
...
...
@@ -152,7 +158,8 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public
boolean
persistSubscriptionGroupConfig
(
SubscriptionGroupConfig
subscriptionGroupConfig
)
{
public
boolean
persistSubscriptionGroupConfig
(
SubscriptionGroupConfig
subscriptionGroupConfig
)
{
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
UPDATE_AND_CREATE_SUBSCRIPTIONGROUP
,
null
);
boolean
persist
=
false
;
for
(
Map
.
Entry
<
String
,
HashMap
<
Long
,
String
>>
entry
:
enodeTable
.
entrySet
())
{
...
...
@@ -177,7 +184,8 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public
void
persistOffset
(
String
enodeName
,
String
groupName
,
String
topic
,
int
queueId
,
long
offset
)
{
public
void
persistOffset
(
final
RemotingChannel
remotingChannel
,
String
enodeName
,
String
groupName
,
String
topic
,
int
queueId
,
long
offset
)
{
try
{
String
address
=
this
.
snodeController
.
getNnodeService
().
getAddressByEnodeName
(
enodeName
,
false
);
UpdateConsumerOffsetRequestHeader
requestHeader
=
new
UpdateConsumerOffsetRequestHeader
();
...
...
@@ -194,22 +202,22 @@ public class EnodeServiceImpl implements EnodeService {
}
@Override
public
RemotingCommand
getMinOffsetInQueue
(
String
enodeName
,
String
topic
,
int
queueId
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
{
GetMinOffsetRequestHeader
requestHeader
=
new
GetMinOffsetRequestHeader
();
requestHeader
.
setTopic
(
topic
);
requestHeader
.
setQueueId
(
queueId
);
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
GET_MIN_OFFSET
,
requestHeader
);
public
long
getMinOffsetInQueue
(
String
enodeName
,
String
topic
,
int
queueId
,
RemotingCommand
request
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
RemotingCommandException
{
String
addr
=
this
.
snodeController
.
getNnodeService
().
getAddressByEnodeName
(
enodeName
,
false
);
return
this
.
snodeController
.
getRemotingClient
().
invokeSync
(
MixAll
.
brokerVIPChannel
(
snodeController
.
getSnodeConfig
().
isVipChannelEnabled
(),
addr
),
RemotingCommand
remotingCommand
=
this
.
snodeController
.
getRemotingClient
().
invokeSync
(
MixAll
.
brokerVIPChannel
(
snodeController
.
getSnodeConfig
().
isVipChannelEnabled
(),
addr
),
request
,
SnodeConstant
.
DEFAULT_TIMEOUT_MILLS
);
GetMinOffsetResponseHeader
responseHeader
=
(
GetMinOffsetResponseHeader
)
remotingCommand
.
decodeCommandCustomHeader
(
GetMinOffsetResponseHeader
.
class
);
return
responseHeader
.
getOffset
();
}
@Override
public
RemotingCommand
load
Offset
(
String
enodeName
,
String
consumerGroup
,
String
topic
,
public
long
query
Offset
(
String
enodeName
,
String
consumerGroup
,
String
topic
,
int
queueId
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
{
RemotingSendRequestException
,
RemotingConnectException
,
RemotingCommandException
{
QueryConsumerOffsetRequestHeader
requestHeader
=
new
QueryConsumerOffsetRequestHeader
();
requestHeader
.
setTopic
(
topic
);
requestHeader
.
setConsumerGroup
(
consumerGroup
);
...
...
@@ -217,28 +225,39 @@ public class EnodeServiceImpl implements EnodeService {
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
QUERY_CONSUMER_OFFSET
,
requestHeader
);
String
addr
=
this
.
snodeController
.
getNnodeService
().
getAddressByEnodeName
(
enodeName
,
false
);
return
this
.
snodeController
.
getRemotingClient
().
invokeSync
(
MixAll
.
brokerVIPChannel
(
this
.
snodeController
.
getSnodeConfig
().
isVipChannelEnabled
(),
addr
),
RemotingCommand
remotingCommand
=
this
.
snodeController
.
getRemotingClient
().
invokeSync
(
MixAll
.
brokerVIPChannel
(
this
.
snodeController
.
getSnodeConfig
().
isVipChannelEnabled
(),
addr
),
request
,
SnodeConstant
.
DEFAULT_TIMEOUT_MILLS
);
QueryConsumerOffsetResponseHeader
responseHeader
=
(
QueryConsumerOffsetResponseHeader
)
remotingCommand
.
decodeCommandCustomHeader
(
QueryConsumerOffsetResponseHeader
.
class
);
return
responseHeader
.
getOffset
();
}
@Override
public
RemotingCommand
getMaxOffsetInQueue
(
String
enodeName
,
public
long
getMaxOffsetInQueue
(
String
enodeName
,
String
topic
,
int
queueId
,
RemotingCommand
request
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
RemotingCommandException
{
String
address
=
this
.
snodeController
.
getNnodeService
().
getAddressByEnodeName
(
enodeName
,
false
);
return
this
.
snodeController
.
getRemotingClient
().
invokeSync
(
address
,
RemotingCommand
remotingCommand
=
this
.
snodeController
.
getRemotingClient
().
invokeSync
(
address
,
request
,
SnodeConstant
.
DEFAULT_TIMEOUT_MILLS
);
GetMaxOffsetResponseHeader
responseHeader
=
(
GetMaxOffsetResponseHeader
)
remotingCommand
.
decodeCommandCustomHeader
(
GetMaxOffsetResponseHeader
.
class
);
return
responseHeader
.
getOffset
();
}
@Override
public
RemotingCommand
getOffsetByTimestamp
(
String
enodeName
,
RemotingCommand
request
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
{
public
long
getOffsetByTimestamp
(
String
enodeName
,
String
topic
,
int
queueId
,
long
timestamp
,
RemotingCommand
request
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
,
RemotingCommandException
{
String
address
=
this
.
snodeController
.
getNnodeService
().
getAddressByEnodeName
(
enodeName
,
false
);
return
this
.
snodeController
.
getRemotingClient
().
invokeSync
(
address
,
RemotingCommand
remotingCommand
=
this
.
snodeController
.
getRemotingClient
().
invokeSync
(
address
,
request
,
SnodeConstant
.
DEFAULT_TIMEOUT_MILLS
);
SearchOffsetResponseHeader
responseHeader
=
(
SearchOffsetResponseHeader
)
remotingCommand
.
decodeCommandCustomHeader
(
SearchOffsetResponseHeader
.
class
);
return
responseHeader
.
getOffset
();
}
@Override
public
RemotingCommand
creatRetryTopic
(
String
enodeName
,
public
RemotingCommand
creatRetryTopic
(
final
RemotingChannel
remotingChannel
,
String
enodeName
,
RemotingCommand
request
)
throws
InterruptedException
,
RemotingTimeoutException
,
RemotingSendRequestException
,
RemotingConnectException
{
String
address
=
this
.
snodeController
.
getNnodeService
().
getAddressByEnodeName
(
enodeName
,
false
);
return
this
.
snodeController
.
getRemotingClient
().
invokeSync
(
address
,
...
...
snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
浏览文件 @
14a75e5c
...
...
@@ -74,7 +74,7 @@ public class SendMessageProcessorTest {
public
void
testSendMessageV2ProcessRequest
()
throws
RemotingCommandException
{
CompletableFuture
<
RemotingCommand
>
future
=
new
CompletableFuture
<>();
RemotingCommand
request
=
createSendMesssageV2Command
();
when
(
this
.
snodeController
.
getEnodeService
().
sendMessage
(
anyString
(),
any
(
RemotingCommand
.
class
))).
thenReturn
(
future
);
when
(
this
.
snodeController
.
getEnodeService
().
sendMessage
(
null
,
anyString
(),
any
(
RemotingCommand
.
class
))).
thenReturn
(
future
);
sendMessageProcessor
.
processRequest
(
remotingChannel
,
request
);
}
...
...
@@ -83,7 +83,7 @@ public class SendMessageProcessorTest {
snodeController
.
setEnodeService
(
enodeService
);
CompletableFuture
<
RemotingCommand
>
future
=
new
CompletableFuture
<>();
RemotingCommand
request
=
createSendBatchMesssageCommand
();
when
(
this
.
snodeController
.
getEnodeService
().
sendMessage
(
anyString
(),
any
(
RemotingCommand
.
class
))).
thenReturn
(
future
);
when
(
this
.
snodeController
.
getEnodeService
().
sendMessage
(
null
,
anyString
(),
any
(
RemotingCommand
.
class
))).
thenReturn
(
future
);
sendMessageProcessor
.
processRequest
(
remotingChannel
,
request
);
}
...
...
snode/src/test/java/org/apache/rocketmq/snode/service/EnodeServiceImplTest.java
→
snode/src/test/java/org/apache/rocketmq/snode/service/
Remote
EnodeServiceImplTest.java
浏览文件 @
14a75e5c
...
...
@@ -30,7 +30,7 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import
org.apache.rocketmq.remoting.transport.rocketmq.NettyRemotingClient
;
import
org.apache.rocketmq.snode.SnodeController
;
import
org.apache.rocketmq.snode.SnodeTestBase
;
import
org.apache.rocketmq.snode.service.impl.EnodeServiceImpl
;
import
org.apache.rocketmq.snode.service.impl.
Remote
EnodeServiceImpl
;
import
org.apache.rocketmq.store.GetMessageResult
;
import
org.apache.rocketmq.store.GetMessageStatus
;
import
org.junit.Before
;
...
...
@@ -51,7 +51,7 @@ import static org.mockito.Mockito.doAnswer;
import
static
org
.
mockito
.
Mockito
.
when
;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
EnodeServiceImplTest
extends
SnodeTestBase
{
public
class
Remote
EnodeServiceImplTest
extends
SnodeTestBase
{
private
EnodeService
enodeService
;
...
...
@@ -74,7 +74,7 @@ public class EnodeServiceImplTest extends SnodeTestBase {
public
void
init
()
{
snodeController
.
setNnodeService
(
nnodeService
);
snodeController
.
setRemotingClient
(
remotingClient
);
enodeService
=
new
EnodeServiceImpl
(
snodeController
);
enodeService
=
new
Remote
EnodeServiceImpl
(
snodeController
);
}
@Test
...
...
@@ -93,7 +93,7 @@ public class EnodeServiceImplTest extends SnodeTestBase {
return
null
;
}
}).
when
(
remotingClient
).
invokeAsync
(
anyString
(),
any
(
RemotingCommand
.
class
),
anyLong
(),
any
(
InvokeCallback
.
class
));
RemotingCommand
response
=
enodeService
.
sendMessage
(
enodeName
,
createSendMesssageCommand
(
group
,
topic
)).
get
(
3000L
,
TimeUnit
.
MILLISECONDS
);
RemotingCommand
response
=
enodeService
.
sendMessage
(
null
,
enodeName
,
createSendMesssageCommand
(
group
,
topic
)).
get
(
3000L
,
TimeUnit
.
MILLISECONDS
);
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUCCESS
);
}
...
...
@@ -118,7 +118,7 @@ public class EnodeServiceImplTest extends SnodeTestBase {
return
null
;
}
}).
when
(
remotingClient
).
invokeAsync
(
anyString
(),
any
(
RemotingCommand
.
class
),
anyLong
(),
any
(
InvokeCallback
.
class
));
RemotingCommand
response
=
enodeService
.
pullMessage
(
enodeName
,
createPullMessage
()).
get
(
3000L
,
TimeUnit
.
MILLISECONDS
);
RemotingCommand
response
=
enodeService
.
pullMessage
(
null
,
enodeName
,
createPullMessage
()).
get
(
3000L
,
TimeUnit
.
MILLISECONDS
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUCCESS
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录