Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
cf3e06ca
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 3 年多
通知
295
Star
16140
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
cf3e06ca
编写于
2月 14, 2019
作者:
H
huzongtang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[RIP-11]add SEND_BATCH_MESSAGE request code to related processor.
上级
806f36b4
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
32 addition
and
16 deletion
+32
-16
remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
...he/rocketmq/remoting/transport/http2/Http2ClientImpl.java
+3
-3
remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
...he/rocketmq/remoting/transport/http2/Http2ServerImpl.java
+6
-6
snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
...apache/rocketmq/snode/processor/SendMessageProcessor.java
+4
-2
snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
...he/rocketmq/snode/processor/SendMessageProcessorTest.java
+19
-5
未找到文件。
remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ClientImpl.java
浏览文件 @
cf3e06ca
...
@@ -82,13 +82,13 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
...
@@ -82,13 +82,13 @@ public class Http2ClientImpl extends NettyRemotingClientAbstract implements Remo
public
RemotingClient
init
(
ClientConfig
clientConfig
,
ChannelEventListener
channelEventListener
)
{
public
RemotingClient
init
(
ClientConfig
clientConfig
,
ChannelEventListener
channelEventListener
)
{
this
.
nettyClientConfig
=
clientConfig
;
this
.
nettyClientConfig
=
clientConfig
;
this
.
channelEventListener
=
channelEventListener
;
this
.
channelEventListener
=
channelEventListener
;
this
.
ioGroup
=
new
NioEventLoopGroup
(
clientConfig
.
getClientWorkerThreads
(),
ThreadUtils
.
newGenericThreadFactory
(
"NettyClientEpollIoThreads"
,
this
.
ioGroup
=
new
NioEventLoopGroup
(
clientConfig
.
getClientWorkerThreads
(),
ThreadUtils
.
newGenericThreadFactory
(
"
Http2
NettyClientEpollIoThreads"
,
clientConfig
.
getClientWorkerThreads
()));
clientConfig
.
getClientWorkerThreads
()));
this
.
publicExecutor
=
ThreadUtils
.
newFixedThreadPool
(
this
.
publicExecutor
=
ThreadUtils
.
newFixedThreadPool
(
clientConfig
.
getClientCallbackExecutorThreads
(),
clientConfig
.
getClientCallbackExecutorThreads
(),
10000
,
"Remoting-PublicExecutor"
,
true
);
10000
,
"
Http2
Remoting-PublicExecutor"
,
true
);
this
.
defaultEventExecutorGroup
=
new
DefaultEventExecutorGroup
(
clientConfig
.
getClientWorkerThreads
(),
this
.
defaultEventExecutorGroup
=
new
DefaultEventExecutorGroup
(
clientConfig
.
getClientWorkerThreads
(),
ThreadUtils
.
newGenericThreadFactory
(
"NettyClientWorkerThreads"
,
clientConfig
.
getClientWorkerThreads
()));
ThreadUtils
.
newGenericThreadFactory
(
"
Http2
NettyClientWorkerThreads"
,
clientConfig
.
getClientWorkerThreads
()));
buildHttp2SslClientContext
();
buildHttp2SslClientContext
();
return
this
;
return
this
;
}
}
...
...
remoting/src/main/java/org/apache/rocketmq/remoting/transport/http2/Http2ServerImpl.java
浏览文件 @
cf3e06ca
...
@@ -97,23 +97,23 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
...
@@ -97,23 +97,23 @@ public class Http2ServerImpl extends NettyRemotingServerAbstract implements Remo
this
.
channelEventListener
=
channelEventListener
;
this
.
channelEventListener
=
channelEventListener
;
this
.
publicExecutor
=
ThreadUtils
.
newFixedThreadPool
(
this
.
publicExecutor
=
ThreadUtils
.
newFixedThreadPool
(
serverConfig
.
getServerCallbackExecutorThreads
(),
serverConfig
.
getServerCallbackExecutorThreads
(),
10000
,
"Remoting-PublicExecutor"
,
true
);
10000
,
"
Http2
Remoting-PublicExecutor"
,
true
);
if
(
JvmUtils
.
isLinux
()
&&
this
.
serverConfig
.
isUseEpollNativeSelector
())
{
if
(
JvmUtils
.
isLinux
()
&&
this
.
serverConfig
.
isUseEpollNativeSelector
())
{
this
.
ioGroup
=
new
EpollEventLoopGroup
(
serverConfig
.
getServerSelectorThreads
(),
ThreadUtils
.
newGenericThreadFactory
(
"NettyEpollIoThreads"
,
this
.
ioGroup
=
new
EpollEventLoopGroup
(
serverConfig
.
getServerSelectorThreads
(),
ThreadUtils
.
newGenericThreadFactory
(
"
Http2
NettyEpollIoThreads"
,
serverConfig
.
getServerSelectorThreads
()));
serverConfig
.
getServerSelectorThreads
()));
this
.
bossGroup
=
new
EpollEventLoopGroup
(
serverConfig
.
getServerAcceptorThreads
(),
ThreadUtils
.
newGenericThreadFactory
(
"NettyBossThreads"
,
this
.
bossGroup
=
new
EpollEventLoopGroup
(
serverConfig
.
getServerAcceptorThreads
(),
ThreadUtils
.
newGenericThreadFactory
(
"
Http2
NettyBossThreads"
,
serverConfig
.
getServerAcceptorThreads
()));
serverConfig
.
getServerAcceptorThreads
()));
this
.
socketChannelClass
=
EpollServerSocketChannel
.
class
;
this
.
socketChannelClass
=
EpollServerSocketChannel
.
class
;
}
else
{
}
else
{
this
.
bossGroup
=
new
NioEventLoopGroup
(
serverConfig
.
getServerAcceptorThreads
(),
ThreadUtils
.
newGenericThreadFactory
(
"NettyBossThreads"
,
this
.
bossGroup
=
new
NioEventLoopGroup
(
serverConfig
.
getServerAcceptorThreads
(),
ThreadUtils
.
newGenericThreadFactory
(
"
Http2
NettyBossThreads"
,
serverConfig
.
getServerAcceptorThreads
()));
serverConfig
.
getServerAcceptorThreads
()));
this
.
ioGroup
=
new
NioEventLoopGroup
(
serverConfig
.
getServerSelectorThreads
(),
ThreadUtils
.
newGenericThreadFactory
(
"NettyNioIoThreads"
,
this
.
ioGroup
=
new
NioEventLoopGroup
(
serverConfig
.
getServerSelectorThreads
(),
ThreadUtils
.
newGenericThreadFactory
(
"
Http2
NettyNioIoThreads"
,
serverConfig
.
getServerSelectorThreads
()));
serverConfig
.
getServerSelectorThreads
()));
this
.
socketChannelClass
=
NioServerSocketChannel
.
class
;
this
.
socketChannelClass
=
NioServerSocketChannel
.
class
;
}
}
this
.
workerGroup
=
new
DefaultEventExecutorGroup
(
serverConfig
.
getServerWorkerThreads
(),
this
.
workerGroup
=
new
DefaultEventExecutorGroup
(
serverConfig
.
getServerWorkerThreads
(),
ThreadUtils
.
newGenericThreadFactory
(
"NettyWorkerThreads"
,
serverConfig
.
getServerWorkerThreads
()));
ThreadUtils
.
newGenericThreadFactory
(
"
Http2
NettyWorkerThreads"
,
serverConfig
.
getServerWorkerThreads
()));
this
.
port
=
nettyServerConfig
.
getListenPort
();
this
.
port
=
nettyServerConfig
.
getListenPort
();
buildHttp2SslServerContext
();
buildHttp2SslServerContext
();
return
this
;
return
this
;
...
...
snode/src/main/java/org/apache/rocketmq/snode/processor/SendMessageProcessor.java
浏览文件 @
cf3e06ca
...
@@ -51,13 +51,15 @@ public class SendMessageProcessor implements RequestProcessor {
...
@@ -51,13 +51,15 @@ public class SendMessageProcessor implements RequestProcessor {
}
}
String
enodeName
;
String
enodeName
;
SendMessageRequestHeaderV2
sendMessageRequestHeaderV2
=
null
;
SendMessageRequestHeaderV2
sendMessageRequestHeaderV2
=
null
;
ConsumerSendMsgBackRequestHeader
consumerSendMsgBackRequestHeader
=
null
;
boolean
isSendBack
=
false
;
boolean
isSendBack
=
false
;
if
(
request
.
getCode
()
==
RequestCode
.
SEND_MESSAGE_V2
)
{
if
(
request
.
getCode
()
==
RequestCode
.
SEND_MESSAGE_V2
||
request
.
getCode
()
==
RequestCode
.
SEND_BATCH_MESSAGE
)
{
sendMessageRequestHeaderV2
=
(
SendMessageRequestHeaderV2
)
request
.
decodeCommandCustomHeader
(
SendMessageRequestHeaderV2
.
class
);
sendMessageRequestHeaderV2
=
(
SendMessageRequestHeaderV2
)
request
.
decodeCommandCustomHeader
(
SendMessageRequestHeaderV2
.
class
);
enodeName
=
sendMessageRequestHeaderV2
.
getN
();
enodeName
=
sendMessageRequestHeaderV2
.
getN
();
}
else
{
}
else
{
isSendBack
=
true
;
isSendBack
=
true
;
ConsumerSendMsgBackRequestHeader
consumerSendMsgBackRequestHeader
=
(
ConsumerSendMsgBackRequestHeader
)
request
.
decodeCommandCustomHeader
(
ConsumerSendMsgBackRequestHeader
.
class
);
consumerSendMsgBackRequestHeader
=
(
ConsumerSendMsgBackRequestHeader
)
request
.
decodeCommandCustomHeader
(
ConsumerSendMsgBackRequestHeader
.
class
);
enodeName
=
consumerSendMsgBackRequestHeader
.
getEnodeName
();
enodeName
=
consumerSendMsgBackRequestHeader
.
getEnodeName
();
}
}
...
...
snode/src/test/java/org/apache/rocketmq/snode/processor/SendMessageProcessorTest.java
浏览文件 @
cf3e06ca
...
@@ -71,12 +71,18 @@ public class SendMessageProcessorTest {
...
@@ -71,12 +71,18 @@ public class SendMessageProcessorTest {
}
}
@Test
@Test
public
void
testProcessRequest
()
throws
RemotingCommandException
{
public
void
testSendMessageV2ProcessRequest
()
throws
RemotingCommandException
{
CompletableFuture
<
RemotingCommand
>
future
=
new
CompletableFuture
<>();
RemotingCommand
request
=
createSendMesssageV2Command
();
when
(
this
.
snodeController
.
getEnodeService
().
sendMessage
(
anyString
(),
any
(
RemotingCommand
.
class
))).
thenReturn
(
future
);
sendMessageProcessor
.
processRequest
(
remotingChannel
,
request
);
}
@Test
public
void
testSendBatchMessageProcessRequest
()
throws
RemotingCommandException
{
snodeController
.
setEnodeService
(
enodeService
);
snodeController
.
setEnodeService
(
enodeService
);
CompletableFuture
<
RemotingCommand
>
future
=
new
CompletableFuture
<>();
CompletableFuture
<
RemotingCommand
>
future
=
new
CompletableFuture
<>();
RemotingCommand
request
=
createSendMesssageCommand
();
RemotingCommand
request
=
createSendBatchMesssageCommand
();
SendMessageRequestHeaderV2
sendMessageRequestHeaderV2
=
(
SendMessageRequestHeaderV2
)
request
.
decodeCommandCustomHeader
(
SendMessageRequestHeaderV2
.
class
);
System
.
out
.
println
(
"sendMessageRequestHeaderV2: "
+
sendMessageRequestHeaderV2
);
when
(
this
.
snodeController
.
getEnodeService
().
sendMessage
(
anyString
(),
any
(
RemotingCommand
.
class
))).
thenReturn
(
future
);
when
(
this
.
snodeController
.
getEnodeService
().
sendMessage
(
anyString
(),
any
(
RemotingCommand
.
class
))).
thenReturn
(
future
);
sendMessageProcessor
.
processRequest
(
remotingChannel
,
request
);
sendMessageProcessor
.
processRequest
(
remotingChannel
,
request
);
}
}
...
@@ -95,7 +101,7 @@ public class SendMessageProcessorTest {
...
@@ -95,7 +101,7 @@ public class SendMessageProcessorTest {
return
requestHeader
;
return
requestHeader
;
}
}
private
RemotingCommand
createSendMesssageCommand
()
{
private
RemotingCommand
createSendMesssage
V2
Command
()
{
SendMessageRequestHeaderV2
sendMessageRequestHeaderV2
=
createSendMsgRequestHeader
();
SendMessageRequestHeaderV2
sendMessageRequestHeaderV2
=
createSendMsgRequestHeader
();
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
SEND_MESSAGE_V2
,
sendMessageRequestHeaderV2
);
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
SEND_MESSAGE_V2
,
sendMessageRequestHeaderV2
);
request
.
setBody
(
new
byte
[]
{
'a'
});
request
.
setBody
(
new
byte
[]
{
'a'
});
...
@@ -103,6 +109,14 @@ public class SendMessageProcessorTest {
...
@@ -103,6 +109,14 @@ public class SendMessageProcessorTest {
return
request
;
return
request
;
}
}
private
RemotingCommand
createSendBatchMesssageCommand
()
{
SendMessageRequestHeaderV2
sendMessageRequestHeaderV2
=
createSendMsgRequestHeader
();
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
RequestCode
.
SEND_BATCH_MESSAGE
,
sendMessageRequestHeaderV2
);
request
.
setBody
(
new
byte
[]
{
'b'
});
CodecHelper
.
makeCustomHeaderToNet
(
request
);
return
request
;
}
RemotingCommand
createSendMessageResponse
(
int
responseCode
)
{
RemotingCommand
createSendMessageResponse
(
int
responseCode
)
{
return
RemotingCommand
.
createResponseCommand
(
ResponseCode
.
SUCCESS
,
null
);
return
RemotingCommand
.
createResponseCommand
(
ResponseCode
.
SUCCESS
,
null
);
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录