提交 3b9f4bb0 编写于 作者: R ruozhuliufeng

更新依赖版本;新增生产者发送超时时限

上级 9edf6dff
......@@ -19,7 +19,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
<version>5.1.0</version>
</dependency>
</dependencies>
</project>
......@@ -17,6 +17,8 @@ public class BatchProducer {
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定NameServer地址
producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
// 设置发送超时时限为20s,默认3s
producer.setSendMsgTimeout(20000);
// 指定要发送的消息的最大大小,默认是4M
// 不过,仅修改该属性是不行的,还需要同时修改broker加载的配置文件中的maxMessageSize属性
// producer.setMaxMessageSize(8 * 1024 * 1024);
......
......@@ -17,6 +17,8 @@ public class DelayProducer {
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定NameServer地址
producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
// 设置发送超时时限为20s,默认3s
producer.setSendMsgTimeout(20000);
// 开启Producer
producer.start();
for (int i = 0; i < 100; i++) {
......
......@@ -14,6 +14,8 @@ public class FilterBySQLProducer {
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定NameServer地址
producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
// 设置发送超时时限为20s,默认3s
producer.setSendMsgTimeout(20000);
// 开启Producer
producer.start();
for (int i = 0; i < 10; i++) {
......
......@@ -14,6 +14,8 @@ public class FilterByTagProducer {
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定NameServer地址
producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
// 设置发送超时时限为20s,默认3s
producer.setSendMsgTimeout(20000);
// 开启Producer
producer.start();
String[] tags = {"myTagA", "myTagB", "myTagC"};
......
......@@ -25,7 +25,7 @@ public class NormalConsumer {
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 指定消费Topic与Tag
consumer.subscribe("somTopic","*");
consumer.subscribe("testTopic","*");
// 指定采用“广播模式”进行消费,默认为集群模式
// consumer.setMessageModel(MessageModel.BROADCASTING);
// 注册消息监听器
......
......@@ -21,13 +21,15 @@ public class AsyncProducer {
producer.setRetryTimesWhenSendAsyncFailed(0);
// 指定新创建的Topic的Queue数量是2,默认为4
producer.setDefaultTopicQueueNums(2);
// 设置发送超时时限为10s,默认3s
producer.setSendMsgTimeout(20000);
// 开启生产者
producer.start();
// 生产并发送100条消息
for (int i = 0; i < 100; i++) {
byte[] body = ("Hi,RocketMQ Async Test "+i).getBytes();
try {
Message msg = new Message("testAsyncTopic", "testAsyncTag", body);
Message msg = new Message("testTopic", "testAsyncTag", body);
// 为消息指定key
msg.setKeys("key-" + i);
// 异步发送,指定回调
......@@ -50,7 +52,7 @@ public class AsyncProducer {
/// sleep 一会
// 由于采用的是异步发送,所以若这里不sleep
// 则消息还未发送就会被producer给关闭,报错
TimeUnit.SECONDS.sleep(3);
TimeUnit.SECONDS.sleep(Long.MAX_VALUE);
// 关闭 生产者
producer.shutdown();
}
......
......@@ -11,6 +11,7 @@ public class OnewayProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("pg");
producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
producer.setSendMsgTimeout(20000);
producer.start();
for (int i = 0; i < 100; i++) {
byte[] body = ("Hi,RocketMQ Oneway Test "+i).getBytes();
......
......@@ -15,12 +15,14 @@ public class OrderedProducer {
DefaultMQProducer producer = new DefaultMQProducer("pg");
// 指定NameServer地址
producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
// 设置发送超时时限为20s,默认3s
producer.setSendMsgTimeout(20000);
// 开启Producer
producer.start();
for (int i = 0; i < 100; i++) {
Integer orderId = i;
byte[] body = ("Hi, Ordered Msg "+i).getBytes();
Message msg = new Message("TopicA","TagA",body);
Message msg = new Message("testTopic","TagA",body);
SendResult sendResult = producer.send(msg, (list, message, o) -> {
Integer id = (Integer) o;
int index = id % list.size();
......
......@@ -23,7 +23,7 @@ public class TransactionConsumer {
// 指定从第一条消息开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 指定消费Topic与Tag
consumer.subscribe("somTopic","*");
consumer.subscribe("TTopic","*");
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
// 一旦Broker中有其订阅的消息就会触发该方法的执行,
......
......@@ -14,6 +14,8 @@ public class TransactionProducer {
TransactionMQProducer producer = new TransactionMQProducer("tpg");
// 指定NameServer地址
producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR);
// 设置发送超时时限为20s,默认3s
producer.setSendMsgTimeout(20000);
/**
* 定义一个线程池
*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册