Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Kwan的解忧杂货铺@新空间代码工作室
Rocketmq
提交
f17da4f4
R
Rocketmq
项目概览
Kwan的解忧杂货铺@新空间代码工作室
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
f17da4f4
编写于
1月 21, 2017
作者:
Y
yukon
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[ROCKETMQ-51] Add unit tests for PullMessageProcessor
上级
264a0560
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
199 addition
and
10 deletion
+199
-10
broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
...e/rocketmq/broker/processor/PullMessageProcessorTest.java
+197
-0
broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
...e/rocketmq/broker/processor/SendMessageProcessorTest.java
+2
-10
未找到文件。
broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
0 → 100644
浏览文件 @
f17da4f4
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.broker.processor
;
import
io.netty.channel.Channel
;
import
io.netty.channel.ChannelHandlerContext
;
import
java.net.InetSocketAddress
;
import
java.util.HashSet
;
import
java.util.Set
;
import
org.apache.rocketmq.broker.BrokerController
;
import
org.apache.rocketmq.broker.client.ClientChannelInfo
;
import
org.apache.rocketmq.common.BrokerConfig
;
import
org.apache.rocketmq.common.TopicConfig
;
import
org.apache.rocketmq.common.consumer.ConsumeFromWhere
;
import
org.apache.rocketmq.common.protocol.RequestCode
;
import
org.apache.rocketmq.common.protocol.ResponseCode
;
import
org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader
;
import
org.apache.rocketmq.common.protocol.heartbeat.ConsumeType
;
import
org.apache.rocketmq.common.protocol.heartbeat.ConsumerData
;
import
org.apache.rocketmq.common.protocol.heartbeat.MessageModel
;
import
org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData
;
import
org.apache.rocketmq.remoting.exception.RemotingCommandException
;
import
org.apache.rocketmq.remoting.netty.NettyClientConfig
;
import
org.apache.rocketmq.remoting.netty.NettyServerConfig
;
import
org.apache.rocketmq.remoting.protocol.RemotingCommand
;
import
org.apache.rocketmq.store.GetMessageResult
;
import
org.apache.rocketmq.store.GetMessageStatus
;
import
org.apache.rocketmq.store.MessageStore
;
import
org.apache.rocketmq.store.config.MessageStoreConfig
;
import
org.junit.Before
;
import
org.junit.Test
;
import
org.junit.runner.RunWith
;
import
org.mockito.Mock
;
import
org.mockito.Spy
;
import
org.mockito.junit.MockitoJUnitRunner
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyInt
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyLong
;
import
static
org
.
mockito
.
ArgumentMatchers
.
anyString
;
import
static
org
.
mockito
.
Mockito
.
mock
;
import
static
org
.
mockito
.
Mockito
.
when
;
@RunWith
(
MockitoJUnitRunner
.
class
)
public
class
PullMessageProcessorTest
{
private
PullMessageProcessor
pullMessageProcessor
;
@Spy
private
BrokerController
brokerController
=
new
BrokerController
(
new
BrokerConfig
(),
new
NettyServerConfig
(),
new
NettyClientConfig
(),
new
MessageStoreConfig
());
@Mock
private
ChannelHandlerContext
handlerContext
;
@Mock
private
MessageStore
messageStore
;
private
ClientChannelInfo
clientChannelInfo
;
private
String
group
=
"FooBarGroup"
;
private
String
topic
=
"FooBar"
;
@Before
public
void
init
()
{
brokerController
.
setMessageStore
(
messageStore
);
pullMessageProcessor
=
new
PullMessageProcessor
(
brokerController
);
Channel
mockChannel
=
mock
(
Channel
.
class
);
when
(
mockChannel
.
remoteAddress
()).
thenReturn
(
new
InetSocketAddress
(
1024
));
when
(
handlerContext
.
channel
()).
thenReturn
(
mockChannel
);
brokerController
.
getTopicConfigManager
().
getTopicConfigTable
().
put
(
topic
,
new
TopicConfig
());
clientChannelInfo
=
new
ClientChannelInfo
(
mockChannel
);
ConsumerData
consumerData
=
createConsumerData
();
brokerController
.
getConsumerManager
().
registerConsumer
(
consumerData
.
getGroupName
(),
clientChannelInfo
,
consumerData
.
getConsumeType
(),
consumerData
.
getMessageModel
(),
consumerData
.
getConsumeFromWhere
(),
consumerData
.
getSubscriptionDataSet
(),
false
);
}
@Test
public
void
testProcessRequest_TopicNotExist
()
throws
RemotingCommandException
{
brokerController
.
getTopicConfigManager
().
getTopicConfigTable
().
remove
(
topic
);
final
RemotingCommand
request
=
createPullMsgCommand
(
RequestCode
.
PULL_MESSAGE
);
RemotingCommand
response
=
pullMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
TOPIC_NOT_EXIST
);
assertThat
(
response
.
getRemark
()).
contains
(
"topic["
+
topic
+
"] not exist"
);
}
@Test
public
void
testProcessRequest_SubNotExist
()
throws
RemotingCommandException
{
brokerController
.
getConsumerManager
().
unregisterConsumer
(
group
,
clientChannelInfo
,
false
);
final
RemotingCommand
request
=
createPullMsgCommand
(
RequestCode
.
PULL_MESSAGE
);
RemotingCommand
response
=
pullMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUBSCRIPTION_NOT_EXIST
);
assertThat
(
response
.
getRemark
()).
contains
(
"consumer's group info not exist"
);
}
@Test
public
void
testProcessRequest_SubNotLatest
()
throws
RemotingCommandException
{
final
RemotingCommand
request
=
createPullMsgCommand
(
RequestCode
.
PULL_MESSAGE
);
request
.
addExtField
(
"subVersion"
,
String
.
valueOf
(
101
));
RemotingCommand
response
=
pullMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUBSCRIPTION_NOT_LATEST
);
assertThat
(
response
.
getRemark
()).
contains
(
"subscription not latest"
);
}
@Test
public
void
testProcessRequest_Found
()
throws
RemotingCommandException
{
GetMessageResult
getMessageResult
=
createGetMessageResult
();
when
(
messageStore
.
getMessage
(
anyString
(),
anyString
(),
anyInt
(),
anyLong
(),
anyInt
(),
any
(
SubscriptionData
.
class
))).
thenReturn
(
getMessageResult
);
final
RemotingCommand
request
=
createPullMsgCommand
(
RequestCode
.
PULL_MESSAGE
);
RemotingCommand
response
=
pullMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
SUCCESS
);
}
@Test
public
void
testProcessRequest_MsgWasRemoving
()
throws
RemotingCommandException
{
GetMessageResult
getMessageResult
=
createGetMessageResult
();
getMessageResult
.
setStatus
(
GetMessageStatus
.
MESSAGE_WAS_REMOVING
);
when
(
messageStore
.
getMessage
(
anyString
(),
anyString
(),
anyInt
(),
anyLong
(),
anyInt
(),
any
(
SubscriptionData
.
class
))).
thenReturn
(
getMessageResult
);
final
RemotingCommand
request
=
createPullMsgCommand
(
RequestCode
.
PULL_MESSAGE
);
RemotingCommand
response
=
pullMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
PULL_RETRY_IMMEDIATELY
);
}
@Test
public
void
testProcessRequest_NoMsgInQueue
()
throws
RemotingCommandException
{
GetMessageResult
getMessageResult
=
createGetMessageResult
();
getMessageResult
.
setStatus
(
GetMessageStatus
.
NO_MESSAGE_IN_QUEUE
);
when
(
messageStore
.
getMessage
(
anyString
(),
anyString
(),
anyInt
(),
anyLong
(),
anyInt
(),
any
(
SubscriptionData
.
class
))).
thenReturn
(
getMessageResult
);
final
RemotingCommand
request
=
createPullMsgCommand
(
RequestCode
.
PULL_MESSAGE
);
RemotingCommand
response
=
pullMessageProcessor
.
processRequest
(
handlerContext
,
request
);
assertThat
(
response
).
isNotNull
();
assertThat
(
response
.
getCode
()).
isEqualTo
(
ResponseCode
.
PULL_OFFSET_MOVED
);
}
private
RemotingCommand
createPullMsgCommand
(
int
requestCode
)
{
PullMessageRequestHeader
requestHeader
=
new
PullMessageRequestHeader
();
requestHeader
.
setCommitOffset
(
123L
);
requestHeader
.
setConsumerGroup
(
group
);
requestHeader
.
setMaxMsgNums
(
100
);
requestHeader
.
setQueueId
(
1
);
requestHeader
.
setQueueOffset
(
456L
);
requestHeader
.
setSubscription
(
"*"
);
requestHeader
.
setTopic
(
topic
);
requestHeader
.
setSysFlag
(
0
);
requestHeader
.
setSubVersion
(
100L
);
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
requestCode
,
requestHeader
);
request
.
makeCustomHeaderToNet
();
return
request
;
}
private
ConsumerData
createConsumerData
()
{
ConsumerData
consumerData
=
new
ConsumerData
();
consumerData
.
setConsumeFromWhere
(
ConsumeFromWhere
.
CONSUME_FROM_FIRST_OFFSET
);
consumerData
.
setConsumeType
(
ConsumeType
.
CONSUME_PASSIVELY
);
consumerData
.
setGroupName
(
group
);
consumerData
.
setMessageModel
(
MessageModel
.
CLUSTERING
);
Set
<
SubscriptionData
>
subscriptionDataSet
=
new
HashSet
<>();
SubscriptionData
subscriptionData
=
new
SubscriptionData
();
subscriptionData
.
setTopic
(
topic
);
subscriptionData
.
setSubString
(
"*"
);
subscriptionData
.
setSubVersion
(
100L
);
subscriptionDataSet
.
add
(
subscriptionData
);
consumerData
.
setSubscriptionDataSet
(
subscriptionDataSet
);
return
consumerData
;
}
private
GetMessageResult
createGetMessageResult
()
{
GetMessageResult
getMessageResult
=
new
GetMessageResult
();
getMessageResult
.
setStatus
(
GetMessageStatus
.
FOUND
);
getMessageResult
.
setMinOffset
(
100
);
getMessageResult
.
setMaxOffset
(
1024
);
getMessageResult
.
setNextBeginOffset
(
516
);
return
getMessageResult
;
}
}
\ No newline at end of file
broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
浏览文件 @
f17da4f4
...
@@ -153,13 +153,7 @@ public class SendMessageProcessorTest {
...
@@ -153,13 +153,7 @@ public class SendMessageProcessorTest {
requestHeader
.
setReconsumeTimes
(
0
);
requestHeader
.
setReconsumeTimes
(
0
);
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
requestCode
,
requestHeader
);
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
requestCode
,
requestHeader
);
request
.
addExtField
(
"queueId"
,
String
.
valueOf
(
requestHeader
.
getQueueId
()));
request
.
makeCustomHeaderToNet
();
request
.
addExtField
(
"topic"
,
String
.
valueOf
(
requestHeader
.
getTopic
()));
request
.
addExtField
(
"defaultTopicQueueNums"
,
String
.
valueOf
(
requestHeader
.
getDefaultTopicQueueNums
()));
request
.
addExtField
(
"defaultTopic"
,
requestHeader
.
getDefaultTopic
());
request
.
addExtField
(
"sysFlag"
,
String
.
valueOf
(
requestHeader
.
getSysFlag
()));
request
.
addExtField
(
"flag"
,
String
.
valueOf
(
requestHeader
.
getFlag
()));
request
.
addExtField
(
"bornTimestamp"
,
String
.
valueOf
(
requestHeader
.
getBornTimestamp
()));
return
request
;
return
request
;
}
}
...
@@ -172,9 +166,7 @@ public class SendMessageProcessorTest {
...
@@ -172,9 +166,7 @@ public class SendMessageProcessorTest {
requestHeader
.
setOffset
(
123L
);
requestHeader
.
setOffset
(
123L
);
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
requestCode
,
requestHeader
);
RemotingCommand
request
=
RemotingCommand
.
createRequestCommand
(
requestCode
,
requestHeader
);
request
.
addExtField
(
"group"
,
requestHeader
.
getGroup
());
request
.
makeCustomHeaderToNet
();
request
.
addExtField
(
"offset"
,
String
.
valueOf
(
requestHeader
.
getOffset
()));
request
.
addExtField
(
"delayLevel"
,
String
.
valueOf
(
requestHeader
.
getDelayLevel
()));
return
request
;
return
request
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录