diff --git a/rocketmq-java/pom.xml b/rocketmq-java/pom.xml index da93d941dc7262abe8cc2620856888ab65de4720..f6348f12ffe0a3ce71ccc34fd8aff80cf2a6a42e 100644 --- a/rocketmq-java/pom.xml +++ b/rocketmq-java/pom.xml @@ -19,7 +19,7 @@ org.apache.rocketmq rocketmq-client - 4.8.0 + 5.1.0 diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/batch/producer/BatchProducer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/batch/producer/BatchProducer.java index de32e5fb2031d74d2ab4e0c5202c2b56899e9479..b244f65e654b8b9790ed139166ae04ef71a13a09 100644 --- a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/batch/producer/BatchProducer.java +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/batch/producer/BatchProducer.java @@ -17,6 +17,8 @@ public class BatchProducer { DefaultMQProducer producer = new DefaultMQProducer("pg"); // 指定NameServer地址 producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR); + // 设置发送超时时限为20s,默认3s + producer.setSendMsgTimeout(20000); // 指定要发送的消息的最大大小,默认是4M // 不过,仅修改该属性是不行的,还需要同时修改broker加载的配置文件中的maxMessageSize属性 // producer.setMaxMessageSize(8 * 1024 * 1024); diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/delay/producer/DelayProducer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/delay/producer/DelayProducer.java index ed11357c033753724f44b94a81d1ac73aceae005..b9142e67ef6bb3bc1f86267519186bb6cb9b6a10 100644 --- a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/delay/producer/DelayProducer.java +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/delay/producer/DelayProducer.java @@ -17,6 +17,8 @@ public class DelayProducer { DefaultMQProducer producer = new DefaultMQProducer("pg"); // 指定NameServer地址 producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR); + // 设置发送超时时限为20s,默认3s + producer.setSendMsgTimeout(20000); // 开启Producer producer.start(); for (int i = 0; i < 100; i++) { diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterBySQLProducer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterBySQLProducer.java index 3d5226c2f12b31ef9ddec968241a52c2b37c33d8..af5ce3b40c786c868c68a126fa91e700455edefc 100644 --- a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterBySQLProducer.java +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterBySQLProducer.java @@ -14,6 +14,8 @@ public class FilterBySQLProducer { DefaultMQProducer producer = new DefaultMQProducer("pg"); // 指定NameServer地址 producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR); + // 设置发送超时时限为20s,默认3s + producer.setSendMsgTimeout(20000); // 开启Producer producer.start(); for (int i = 0; i < 10; i++) { diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterByTagProducer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterByTagProducer.java index 89d6dfcd19190b433d918a3b2abb3d672a653a54..258ab9340d3dbddac29a734ab31d1f6d25e9c624 100644 --- a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterByTagProducer.java +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/filter/producer/FilterByTagProducer.java @@ -14,6 +14,8 @@ public class FilterByTagProducer { DefaultMQProducer producer = new DefaultMQProducer("pg"); // 指定NameServer地址 producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR); + // 设置发送超时时限为20s,默认3s + producer.setSendMsgTimeout(20000); // 开启Producer producer.start(); String[] tags = {"myTagA", "myTagB", "myTagC"}; diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/normal/consumer/NormalConsumer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/normal/consumer/NormalConsumer.java index fab7facc9f67269add5954d89386c054f89402fe..118548e3be59a0ca8c086fd38d096da2444f2e37 100644 --- a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/normal/consumer/NormalConsumer.java +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/normal/consumer/NormalConsumer.java @@ -25,7 +25,7 @@ public class NormalConsumer { // 指定从第一条消息开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 指定消费Topic与Tag - consumer.subscribe("somTopic","*"); + consumer.subscribe("testTopic","*"); // 指定采用“广播模式”进行消费,默认为集群模式 // consumer.setMessageModel(MessageModel.BROADCASTING); // 注册消息监听器 diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/normal/producer/AsyncProducer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/normal/producer/AsyncProducer.java index c08b97d3b80aa5ae2a03a8fcd7a0fd06d8b46142..9b03ea82920945fb4cc1ece3300ebe249362a27f 100644 --- a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/normal/producer/AsyncProducer.java +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/normal/producer/AsyncProducer.java @@ -21,13 +21,15 @@ public class AsyncProducer { producer.setRetryTimesWhenSendAsyncFailed(0); // 指定新创建的Topic的Queue数量是2,默认为4 producer.setDefaultTopicQueueNums(2); + // 设置发送超时时限为10s,默认3s + producer.setSendMsgTimeout(20000); // 开启生产者 producer.start(); // 生产并发送100条消息 for (int i = 0; i < 100; i++) { byte[] body = ("Hi,RocketMQ Async Test "+i).getBytes(); try { - Message msg = new Message("testAsyncTopic", "testAsyncTag", body); + Message msg = new Message("testTopic", "testAsyncTag", body); // 为消息指定key msg.setKeys("key-" + i); // 异步发送,指定回调 @@ -50,7 +52,7 @@ public class AsyncProducer { /// sleep 一会 // 由于采用的是异步发送,所以若这里不sleep // 则消息还未发送就会被producer给关闭,报错 - TimeUnit.SECONDS.sleep(3); + TimeUnit.SECONDS.sleep(Long.MAX_VALUE); // 关闭 生产者 producer.shutdown(); } diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/normal/producer/OnewayProducer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/normal/producer/OnewayProducer.java index 54fe6a4f18e2048535c939b87b3dd1f407ee9593..bdcf4ffea2c19dcab32627b9687fc710832bbdfc 100644 --- a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/normal/producer/OnewayProducer.java +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/normal/producer/OnewayProducer.java @@ -11,6 +11,7 @@ public class OnewayProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("pg"); producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR); + producer.setSendMsgTimeout(20000); producer.start(); for (int i = 0; i < 100; i++) { byte[] body = ("Hi,RocketMQ Oneway Test "+i).getBytes(); diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/ordered/producer/OrderedProducer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/ordered/producer/OrderedProducer.java index 9a3831bcd1cf34957ec1f1c8731019aee2ca3476..b6e3077e109e26fd0c7bd45ac9ff4a7531e42a4d 100644 --- a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/ordered/producer/OrderedProducer.java +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/ordered/producer/OrderedProducer.java @@ -15,12 +15,14 @@ public class OrderedProducer { DefaultMQProducer producer = new DefaultMQProducer("pg"); // 指定NameServer地址 producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR); + // 设置发送超时时限为20s,默认3s + producer.setSendMsgTimeout(20000); // 开启Producer producer.start(); for (int i = 0; i < 100; i++) { Integer orderId = i; byte[] body = ("Hi, Ordered Msg "+i).getBytes(); - Message msg = new Message("TopicA","TagA",body); + Message msg = new Message("testTopic","TagA",body); SendResult sendResult = producer.send(msg, (list, message, o) -> { Integer id = (Integer) o; int index = id % list.size(); diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/transaction/consumer/TransactionConsumer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/transaction/consumer/TransactionConsumer.java index b1f7c7fb7a4cd8b81b77a76e3a9fdfe3bce488b2..13f9659064027893fd4b2e5df955a44b9e178ca4 100644 --- a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/transaction/consumer/TransactionConsumer.java +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/transaction/consumer/TransactionConsumer.java @@ -23,7 +23,7 @@ public class TransactionConsumer { // 指定从第一条消息开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 指定消费Topic与Tag - consumer.subscribe("somTopic","*"); + consumer.subscribe("TTopic","*"); // 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { // 一旦Broker中有其订阅的消息就会触发该方法的执行, diff --git a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/transaction/producer/TransactionProducer.java b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/transaction/producer/TransactionProducer.java index 4b56867e1db9317e3099be3576410d8f26103acd..c6ee0179926711b464a2b07a68f2795e2242ff88 100644 --- a/rocketmq-java/src/main/java/tech/msop/test/rocketmq/transaction/producer/TransactionProducer.java +++ b/rocketmq-java/src/main/java/tech/msop/test/rocketmq/transaction/producer/TransactionProducer.java @@ -14,6 +14,8 @@ public class TransactionProducer { TransactionMQProducer producer = new TransactionMQProducer("tpg"); // 指定NameServer地址 producer.setNamesrvAddr(RocketMQConstants.NAMESRV_ADDR); + // 设置发送超时时限为20s,默认3s + producer.setSendMsgTimeout(20000); /** * 定义一个线程池 *