Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Kwan的解忧杂货铺@新空间代码工作室
Rocketmq
提交
703ac00b
R
Rocketmq
项目概览
Kwan的解忧杂货铺@新空间代码工作室
/
Rocketmq
与 Fork 源项目一致
Fork自
Apache RocketMQ / Rocketmq
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
提交
703ac00b
编写于
6月 08, 2017
作者:
D
dongeforever
提交者:
yukon
6月 08, 2017
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[ROCKETMQ-220] Add IT test for Filter By SQL 92, closes
apache/incubator-rocketmq#114
上级
7374914b
变更
7
显示空白变更内容
内联
并排
Showing
7 changed file
with
152 addition
and
2 deletion
+152
-2
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
...ain/java/org/apache/rocketmq/broker/BrokerController.java
+1
-1
example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
.../java/org/apache/rocketmq/example/filter/SqlConsumer.java
+0
-1
test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
...a/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
+42
-0
test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
...ava/org/apache/rocketmq/test/factory/ConsumerFactory.java
+12
-0
test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
...a/org/apache/rocketmq/test/listener/AbstractListener.java
+22
-0
test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
...va/org/apache/rocketmq/test/base/IntegrationTestBase.java
+1
-0
test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
...che/rocketmq/test/client/consumer/filter/SqlFilterIT.java
+74
-0
未找到文件。
broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
浏览文件 @
703ac00b
...
...
@@ -421,7 +421,7 @@ public class BrokerController {
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
HEART_BEAT
,
clientProcessor
,
this
.
clientManageExecutor
);
this
.
fastRemotingServer
.
registerProcessor
(
RequestCode
.
UNREGISTER_CLIENT
,
clientProcessor
,
this
.
clientManageExecutor
);
this
.
r
emotingServer
.
registerProcessor
(
RequestCode
.
CHECK_CLIENT_CONFIG
,
clientProcessor
,
this
.
clientManageExecutor
);
this
.
fastR
emotingServer
.
registerProcessor
(
RequestCode
.
CHECK_CLIENT_CONFIG
,
clientProcessor
,
this
.
clientManageExecutor
);
/**
* ConsumerManageProcessor
...
...
example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
浏览文件 @
703ac00b
...
...
@@ -31,7 +31,6 @@ public class SqlConsumer {
public
static
void
main
(
String
[]
args
)
{
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"please_rename_unique_group_name_4"
);
try
{
consumer
.
subscribe
(
"TopicTest"
,
MessageSelector
.
bySql
(
"(TAGS is not null and TAGS in ('TagA', 'TagB'))"
+
...
...
test/src/main/java/org/apache/rocketmq/test/client/rmq/RMQSqlConsumer.java
0 → 100644
浏览文件 @
703ac00b
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.test.client.rmq
;
import
org.apache.log4j.Logger
;
import
org.apache.rocketmq.client.consumer.MessageSelector
;
import
org.apache.rocketmq.test.listener.AbstractListener
;
public
class
RMQSqlConsumer
extends
RMQNormalConsumer
{
private
static
Logger
logger
=
Logger
.
getLogger
(
RMQSqlConsumer
.
class
);
private
MessageSelector
selector
;
public
RMQSqlConsumer
(
String
nsAddr
,
String
topic
,
MessageSelector
selector
,
String
consumerGroup
,
AbstractListener
listener
)
{
super
(
nsAddr
,
topic
,
"*"
,
consumerGroup
,
listener
);
this
.
selector
=
selector
;
}
@Override
public
void
create
()
{
super
.
create
();
try
{
consumer
.
subscribe
(
topic
,
selector
);
}
catch
(
Exception
e
)
{
logger
.
error
(
"Subscribe Sql Errored"
,
e
);
}
}
}
test/src/main/java/org/apache/rocketmq/test/factory/ConsumerFactory.java
浏览文件 @
703ac00b
...
...
@@ -17,8 +17,10 @@
package
org.apache.rocketmq.test.factory
;
import
org.apache.rocketmq.client.consumer.MessageSelector
;
import
org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer
;
import
org.apache.rocketmq.test.client.rmq.RMQNormalConsumer
;
import
org.apache.rocketmq.test.client.rmq.RMQSqlConsumer
;
import
org.apache.rocketmq.test.listener.AbstractListener
;
public
class
ConsumerFactory
{
...
...
@@ -42,4 +44,14 @@ public class ConsumerFactory {
consumer
.
start
();
return
consumer
;
}
public
static
RMQSqlConsumer
getRMQSqlConsumer
(
String
nsAddr
,
String
consumerGroup
,
String
topic
,
MessageSelector
selector
,
AbstractListener
listner
)
{
RMQSqlConsumer
consumer
=
new
RMQSqlConsumer
(
nsAddr
,
topic
,
selector
,
consumerGroup
,
listner
);
consumer
.
create
();
consumer
.
start
();
return
consumer
;
}
}
test/src/main/java/org/apache/rocketmq/test/listener/AbstractListener.java
浏览文件 @
703ac00b
...
...
@@ -95,6 +95,28 @@ public class AbstractListener extends MQCollector implements MessageListener {
return
sendMsgs
;
}
public
long
waitForMessageConsume
(
int
size
,
int
timeoutMills
)
{
long
curTime
=
System
.
currentTimeMillis
();
while
(
true
)
{
if
(
msgBodys
.
getDataSize
()
>=
size
)
{
break
;
}
if
(
System
.
currentTimeMillis
()
-
curTime
>=
timeoutMills
)
{
logger
.
error
(
String
.
format
(
"timeout but [%s] not recv all send messages!"
,
listnerName
));
break
;
}
else
{
logger
.
info
(
String
.
format
(
"[%s] still [%s] msg not recv!"
,
listnerName
,
size
-
msgBodys
.
getDataSize
()));
TestUtil
.
waitForMonment
(
500
);
}
}
return
msgBodys
.
getDataSize
();
}
public
void
waitForMessageConsume
(
Map
<
Object
,
Object
>
sendMsgIndex
,
int
timeoutMills
)
{
Collection
<
Object
>
notRecvMsgs
=
waitForMessageConsume
(
sendMsgIndex
.
keySet
(),
timeoutMills
);
for
(
Object
object
:
notRecvMsgs
)
{
...
...
test/src/test/java/org/apache/rocketmq/test/base/IntegrationTestBase.java
浏览文件 @
703ac00b
...
...
@@ -127,6 +127,7 @@ public class IntegrationTestBase {
brokerConfig
.
setBrokerName
(
BROKER_NAME_PREFIX
+
BROKER_INDEX
.
getAndIncrement
());
brokerConfig
.
setBrokerIP1
(
"127.0.0.1"
);
brokerConfig
.
setNamesrvAddr
(
nsAddr
);
brokerConfig
.
setEnablePropertyFilter
(
true
);
storeConfig
.
setStorePathRootDir
(
baseDir
);
storeConfig
.
setStorePathCommitLog
(
baseDir
+
SEP
+
"commitlog"
);
storeConfig
.
setHaListenPort
(
8000
+
random
.
nextInt
(
1000
));
...
...
test/src/test/java/org/apache/rocketmq/test/client/consumer/filter/SqlFilterIT.java
0 → 100644
浏览文件 @
703ac00b
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.test.client.consumer.filter
;
import
org.apache.log4j.Logger
;
import
org.apache.rocketmq.client.consumer.MessageSelector
;
import
org.apache.rocketmq.test.base.BaseConf
;
import
org.apache.rocketmq.test.client.consumer.broadcast.BaseBroadCastIT
;
import
org.apache.rocketmq.test.client.consumer.broadcast.normal.NormalMsgTwoSameGroupConsumerIT
;
import
org.apache.rocketmq.test.client.rmq.RMQBroadCastConsumer
;
import
org.apache.rocketmq.test.client.rmq.RMQNormalProducer
;
import
org.apache.rocketmq.test.client.rmq.RMQSqlConsumer
;
import
org.apache.rocketmq.test.factory.ConsumerFactory
;
import
org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListner
;
import
org.apache.rocketmq.test.util.VerifyUtils
;
import
org.junit.After
;
import
org.junit.Assert
;
import
org.junit.Before
;
import
org.junit.Test
;
import
static
com
.
google
.
common
.
truth
.
Truth
.
assertThat
;
public
class
SqlFilterIT
extends
BaseConf
{
private
static
Logger
logger
=
Logger
.
getLogger
(
SqlFilterIT
.
class
);
private
RMQNormalProducer
producer
=
null
;
private
String
topic
=
null
;
@Before
public
void
setUp
()
{
topic
=
initTopic
();
logger
.
info
(
String
.
format
(
"use topic: %s;"
,
topic
));
producer
=
getProducer
(
nsAddr
,
topic
);
}
@After
public
void
tearDown
()
{
super
.
shutDown
();
}
@Test
public
void
testFilterConsumer
()
throws
Exception
{
int
msgSize
=
16
;
String
group
=
initConsumerGroup
();
MessageSelector
selector
=
MessageSelector
.
bySql
(
"(TAGS is not null and TAGS in ('TagA', 'TagB'))"
);
RMQSqlConsumer
consumer
=
ConsumerFactory
.
getRMQSqlConsumer
(
nsAddr
,
group
,
topic
,
selector
,
new
RMQNormalListner
(
group
+
"_1"
));
Thread
.
sleep
(
3000
);
producer
.
send
(
"TagA"
,
msgSize
);
producer
.
send
(
"TagB"
,
msgSize
);
producer
.
send
(
"TagC"
,
msgSize
);
Assert
.
assertEquals
(
"Not all sent succeeded"
,
msgSize
*
3
,
producer
.
getAllUndupMsgBody
().
size
());
consumer
.
getListner
().
waitForMessageConsume
(
msgSize
*
2
,
consumeTime
);
assertThat
(
producer
.
getAllMsgBody
())
.
containsAllIn
(
VerifyUtils
.
getFilterdMessage
(
producer
.
getAllMsgBody
(),
consumer
.
getListner
().
getAllMsgBody
()));
assertThat
(
consumer
.
getListner
().
getAllMsgBody
().
size
()).
isEqualTo
(
msgSize
*
2
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录