Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
Apache RocketMQ
Rocketmq
提交
11f6eddb
R
Rocketmq
项目概览
Apache RocketMQ
/
Rocketmq
上一次同步 大约 3 年
通知
267
Star
16139
Fork
68
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
DevOps
流水线
流水线任务
计划
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
R
Rocketmq
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
DevOps
DevOps
流水线
流水线任务
计划
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
流水线任务
提交
Issue看板
未验证
提交
11f6eddb
编写于
1月 24, 2019
作者:
H
Heng Du
提交者:
GitHub
1月 24, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #620 from RongtongJin/polish_filter_example
[ISSUE #608] Polish filter example
上级
a7b0c27f
e2043765
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
76 addition
and
45 deletion
+76
-45
example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterConsumer.java
...org/apache/rocketmq/example/filter/SqlFilterConsumer.java
+10
-19
example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterProducer.java
...org/apache/rocketmq/example/filter/SqlFilterProducer.java
+19
-17
example/src/main/java/org/apache/rocketmq/example/filter/TagFilterConsumer.java
...org/apache/rocketmq/example/filter/TagFilterConsumer.java
+3
-9
example/src/main/java/org/apache/rocketmq/example/filter/TagFilterProducer.java
...org/apache/rocketmq/example/filter/TagFilterProducer.java
+44
-0
未找到文件。
example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java
→
example/src/main/java/org/apache/rocketmq/example/filter/Sql
Filter
Consumer.java
浏览文件 @
11f6eddb
...
...
@@ -17,28 +17,24 @@
package
org.apache.rocketmq.example.filter
;
import
java.util.List
;
import
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
;
import
org.apache.rocketmq.client.consumer.MessageSelector
;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext
;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
;
import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.common.message.MessageExt
;
import
java.util.List
;
public
class
SqlFilterConsumer
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
public
class
SqlConsumer
{
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"please_rename_unique_group_name"
);
public
static
void
main
(
String
[]
args
)
{
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"please_rename_unique_group_name_4"
);
try
{
consumer
.
subscribe
(
"TopicTest"
,
MessageSelector
.
bySql
(
"(TAGS is not null and TAGS in ('TagA', 'TagB'))"
+
"and (a is not null and a between 0 3)"
));
}
catch
(
MQClientException
e
)
{
e
.
printStackTrace
();
return
;
}
// Don't forget to set enablePropertyFilter=true in broker
consumer
.
subscribe
(
"SqlFilterTest"
,
MessageSelector
.
bySql
(
"(TAGS is not null and TAGS in ('TagA', 'TagB'))"
+
"and (a is not null and a between 0 and 3)"
));
consumer
.
registerMessageListener
(
new
MessageListenerConcurrently
()
{
...
...
@@ -50,12 +46,7 @@ public class SqlConsumer {
}
});
try
{
consumer
.
start
();
}
catch
(
MQClientException
e
)
{
e
.
printStackTrace
();
return
;
}
consumer
.
start
();
System
.
out
.
printf
(
"Consumer Started.%n"
);
}
}
example/src/main/java/org/apache/rocketmq/example/filter/Producer.java
→
example/src/main/java/org/apache/rocketmq/example/filter/
SqlFilter
Producer.java
浏览文件 @
11f6eddb
...
...
@@ -14,33 +14,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.example.filter
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
public
class
Producer
{
public
static
void
main
(
String
[]
args
)
throws
MQClientException
,
InterruptedException
{
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"ProducerGroupName"
);
public
class
SqlFilterProducer
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"please_rename_unique_group_name"
);
producer
.
start
();
try
{
for
(
int
i
=
0
;
i
<
6000000
;
i
++)
{
Message
msg
=
new
Message
(
"TopicFilter7"
,
"TagA"
,
"OrderID001"
,
"Hello world"
.
getBytes
(
RemotingHelper
.
DEFAULT_CHARSET
));
msg
.
putUserProperty
(
"SequenceId"
,
String
.
valueOf
(
i
));
SendResult
sendResult
=
producer
.
send
(
msg
);
System
.
out
.
printf
(
"%s%n"
,
sendResult
);
}
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
String
[]
tags
=
new
String
[]
{
"TagA"
,
"TagB"
,
"TagC"
};
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
Message
msg
=
new
Message
(
"SqlFilterTest"
,
tags
[
i
%
tags
.
length
],
(
"Hello RocketMQ "
+
i
).
getBytes
(
RemotingHelper
.
DEFAULT_CHARSET
)
);
msg
.
putUserProperty
(
"a"
,
String
.
valueOf
(
i
));
SendResult
sendResult
=
producer
.
send
(
msg
);
System
.
out
.
printf
(
"%s%n"
,
sendResult
);
}
producer
.
shutdown
();
}
}
example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java
→
example/src/main/java/org/apache/rocketmq/example/filter/
TagFilter
Consumer.java
浏览文件 @
11f6eddb
...
...
@@ -16,7 +16,6 @@
*/
package
org.apache.rocketmq.example.filter
;
import
java.io.File
;
import
java.io.IOException
;
import
java.util.List
;
import
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
;
...
...
@@ -24,20 +23,15 @@ import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import
org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus
;
import
org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.common.MixAll
;
import
org.apache.rocketmq.common.message.MessageExt
;
public
class
Consumer
{
public
class
TagFilter
Consumer
{
public
static
void
main
(
String
[]
args
)
throws
InterruptedException
,
MQClientException
,
IOException
{
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"ConsumerGroupNamecc4"
);
ClassLoader
classLoader
=
Thread
.
currentThread
().
getContextClassLoader
();
File
classFile
=
new
File
(
classLoader
.
getResource
(
"MessageFilterImpl.java"
).
getFile
());
DefaultMQPushConsumer
consumer
=
new
DefaultMQPushConsumer
(
"please_rename_unique_group_name"
);
String
filterCode
=
MixAll
.
file2String
(
classFile
);
consumer
.
subscribe
(
"TopicTest"
,
"org.apache.rocketmq.example.filter.MessageFilterImpl"
,
filterCode
);
consumer
.
subscribe
(
"TagFilterTest"
,
"TagA || TagC"
);
consumer
.
registerMessageListener
(
new
MessageListenerConcurrently
()
{
...
...
example/src/main/java/org/apache/rocketmq/example/filter/
Sql
Producer.java
→
example/src/main/java/org/apache/rocketmq/example/filter/
TagFilter
Producer.java
浏览文件 @
11f6eddb
...
...
@@ -14,54 +14,31 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package
org.apache.rocketmq.example.filter
;
import
org.apache.rocketmq.client.exception.MQClientException
;
import
org.apache.rocketmq.client.producer.DefaultMQProducer
;
import
org.apache.rocketmq.client.producer.SendResult
;
import
org.apache.rocketmq.common.message.Message
;
import
org.apache.rocketmq.remoting.common.RemotingHelper
;
public
class
SqlProducer
{
public
class
TagFilterProducer
{
public
static
void
main
(
String
[]
args
)
throws
Exception
{
public
static
void
main
(
String
[]
args
)
{
DefaultMQProducer
producer
=
new
DefaultMQProducer
(
"please_rename_unique_group_name"
);
try
{
producer
.
start
();
}
catch
(
MQClientException
e
)
{
e
.
printStackTrace
();
return
;
}
producer
.
start
();
for
(
int
i
=
0
;
i
<
10
;
i
++)
{
try
{
String
tag
;
int
div
=
i
%
3
;
if
(
div
==
0
)
{
tag
=
"TagA"
;
}
else
if
(
div
==
1
)
{
tag
=
"TagB"
;
}
else
{
tag
=
"TagC"
;
}
Message
msg
=
new
Message
(
"TopicTest"
,
tag
,
(
"Hello RocketMQ "
+
i
).
getBytes
(
RemotingHelper
.
DEFAULT_CHARSET
)
);
msg
.
putUserProperty
(
"a"
,
String
.
valueOf
(
i
));
String
[]
tags
=
new
String
[]
{
"TagA"
,
"TagB"
,
"TagC"
};
SendResult
sendResult
=
producer
.
send
(
msg
);
System
.
out
.
printf
(
"%s%n"
,
sendResult
);
}
catch
(
Exception
e
)
{
e
.
printStackTrace
();
try
{
Thread
.
sleep
(
1000
);
}
catch
(
InterruptedException
e1
)
{
e1
.
printStackTrace
();
}
}
for
(
int
i
=
0
;
i
<
60
;
i
++)
{
Message
msg
=
new
Message
(
"TagFilterTest"
,
tags
[
i
%
tags
.
length
],
"Hello world"
.
getBytes
(
RemotingHelper
.
DEFAULT_CHARSET
));
SendResult
sendResult
=
producer
.
send
(
msg
);
System
.
out
.
printf
(
"%s%n"
,
sendResult
);
}
producer
.
shutdown
();
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录