From f4cf9ed1da81c78c9f634abd6edaa78e51353fe8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A7=A6=E8=8B=B1=E6=9D=B0?= <327782001@qq.com> Date: Fri, 26 May 2023 12:22:56 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rocketmq-demo/pom.xml | 7 +++++++ rocketmq-demo/shuyu.iml | 13 +++++++++++++ .../src/main/java/base/consumer/Consumer.java | 12 ++++++------ .../src/main/java/base/producer/AsyncProducer.java | 10 ++++------ .../src/main/java/base/producer/OneWayProducer.java | 2 +- .../src/main/java/base/producer/SyncProducer.java | 2 +- rocketmq-demo/src/main/java/batch/Consumer.java | 2 +- rocketmq-demo/src/main/java/batch/Producer.java | 2 +- rocketmq-demo/src/main/java/delay/Consumer.java | 2 +- rocketmq-demo/src/main/java/delay/Producer.java | 2 +- .../src/main/java/filter/sql/Consumer.java | 2 +- .../src/main/java/filter/sql/Producer.java | 2 +- .../src/main/java/filter/tag/Consumer.java | 2 +- .../src/main/java/filter/tag/Producer.java | 2 +- rocketmq-demo/src/main/java/order/Consumer.java | 2 +- rocketmq-demo/src/main/java/order/Producer.java | 2 +- .../src/main/java/transaction/Consumer.java | 2 +- .../src/main/java/transaction/Producer.java | 3 ++- .../src/main/resources/application.properties | 2 +- .../src/main/resources/application.properties | 2 +- 20 files changed, 47 insertions(+), 28 deletions(-) diff --git a/rocketmq-demo/pom.xml b/rocketmq-demo/pom.xml index 7ed8918..5554ad3 100644 --- a/rocketmq-demo/pom.xml +++ b/rocketmq-demo/pom.xml @@ -12,5 +12,12 @@ 8 8 + + + org.apache.rocketmq + rocketmq-client + 4.8.0 + + \ No newline at end of file diff --git a/rocketmq-demo/shuyu.iml b/rocketmq-demo/shuyu.iml index c035f0b..a945e90 100644 --- a/rocketmq-demo/shuyu.iml +++ b/rocketmq-demo/shuyu.iml @@ -11,5 +11,18 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/base/consumer/Consumer.java b/rocketmq-demo/src/main/java/base/consumer/Consumer.java index abde08b..2ac732b 100644 --- a/rocketmq-demo/src/main/java/base/consumer/Consumer.java +++ b/rocketmq-demo/src/main/java/base/consumer/Consumer.java @@ -11,23 +11,23 @@ import java.util.List; /** * 消息的接受者 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 11:45 */ public class Consumer { - public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("base", "*"); - //设定消费模式:负载均衡|广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); - //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { - //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -40,4 +40,4 @@ public class Consumer { //5.启动消费者consumer consumer.start(); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java b/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java index 64977c4..f5f2a33 100644 --- a/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java +++ b/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java @@ -11,15 +11,13 @@ import java.util.concurrent.TimeUnit; * 发送异步消息 */ public class AsyncProducer { - public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); - for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** @@ -34,6 +32,7 @@ public class AsyncProducer { * 发送成功回调函数 * @param sendResult */ + @Override public void onSuccess(SendResult sendResult) { System.out.println("发送结果:" + sendResult); } @@ -42,16 +41,15 @@ public class AsyncProducer { * 发送失败回调函数 * @param e */ + @Override public void onException(Throwable e) { System.out.println("发送异常:" + e); } }); - //线程睡1秒 TimeUnit.SECONDS.sleep(1); } - //6.关闭生产者producer producer.shutdown(); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java b/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java index 3caadc8..30c0dce 100644 --- a/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java +++ b/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java @@ -15,7 +15,7 @@ public class OneWayProducer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); diff --git a/rocketmq-demo/src/main/java/base/producer/SyncProducer.java b/rocketmq-demo/src/main/java/base/producer/SyncProducer.java index b814224..4ef4c61 100644 --- a/rocketmq-demo/src/main/java/base/producer/SyncProducer.java +++ b/rocketmq-demo/src/main/java/base/producer/SyncProducer.java @@ -16,7 +16,7 @@ public class SyncProducer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); diff --git a/rocketmq-demo/src/main/java/batch/Consumer.java b/rocketmq-demo/src/main/java/batch/Consumer.java index a3a30f1..4db21fc 100644 --- a/rocketmq-demo/src/main/java/batch/Consumer.java +++ b/rocketmq-demo/src/main/java/batch/Consumer.java @@ -13,7 +13,7 @@ public class Consumer { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("BatchTopic", "*"); diff --git a/rocketmq-demo/src/main/java/batch/Producer.java b/rocketmq-demo/src/main/java/batch/Producer.java index 621cd7f..cc13734 100644 --- a/rocketmq-demo/src/main/java/batch/Producer.java +++ b/rocketmq-demo/src/main/java/batch/Producer.java @@ -15,7 +15,7 @@ public class Producer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); diff --git a/rocketmq-demo/src/main/java/delay/Consumer.java b/rocketmq-demo/src/main/java/delay/Consumer.java index dca52bb..7960414 100644 --- a/rocketmq-demo/src/main/java/delay/Consumer.java +++ b/rocketmq-demo/src/main/java/delay/Consumer.java @@ -14,7 +14,7 @@ public class Consumer { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("DelayTopic", "*"); diff --git a/rocketmq-demo/src/main/java/delay/Producer.java b/rocketmq-demo/src/main/java/delay/Producer.java index b910030..0275caf 100644 --- a/rocketmq-demo/src/main/java/delay/Producer.java +++ b/rocketmq-demo/src/main/java/delay/Producer.java @@ -16,7 +16,7 @@ public class Producer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); diff --git a/rocketmq-demo/src/main/java/filter/sql/Consumer.java b/rocketmq-demo/src/main/java/filter/sql/Consumer.java index 90db71e..7df5470 100644 --- a/rocketmq-demo/src/main/java/filter/sql/Consumer.java +++ b/rocketmq-demo/src/main/java/filter/sql/Consumer.java @@ -14,7 +14,7 @@ public class Consumer { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5")); diff --git a/rocketmq-demo/src/main/java/filter/sql/Producer.java b/rocketmq-demo/src/main/java/filter/sql/Producer.java index 36aa7bb..ef1a714 100644 --- a/rocketmq-demo/src/main/java/filter/sql/Producer.java +++ b/rocketmq-demo/src/main/java/filter/sql/Producer.java @@ -13,7 +13,7 @@ public class Producer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); diff --git a/rocketmq-demo/src/main/java/filter/tag/Consumer.java b/rocketmq-demo/src/main/java/filter/tag/Consumer.java index b1c7d4d..5180f4e 100644 --- a/rocketmq-demo/src/main/java/filter/tag/Consumer.java +++ b/rocketmq-demo/src/main/java/filter/tag/Consumer.java @@ -13,7 +13,7 @@ public class Consumer { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("FilterTagTopic", "Tag1 || Tag2 "); diff --git a/rocketmq-demo/src/main/java/filter/tag/Producer.java b/rocketmq-demo/src/main/java/filter/tag/Producer.java index 6d967d3..6af049e 100644 --- a/rocketmq-demo/src/main/java/filter/tag/Producer.java +++ b/rocketmq-demo/src/main/java/filter/tag/Producer.java @@ -13,7 +13,7 @@ public class Producer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); diff --git a/rocketmq-demo/src/main/java/order/Consumer.java b/rocketmq-demo/src/main/java/order/Consumer.java index 4fc9335..05adca4 100644 --- a/rocketmq-demo/src/main/java/order/Consumer.java +++ b/rocketmq-demo/src/main/java/order/Consumer.java @@ -14,7 +14,7 @@ public class Consumer { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("OrderTopic", "*"); diff --git a/rocketmq-demo/src/main/java/order/Producer.java b/rocketmq-demo/src/main/java/order/Producer.java index d075a94..16dcfce 100644 --- a/rocketmq-demo/src/main/java/order/Producer.java +++ b/rocketmq-demo/src/main/java/order/Producer.java @@ -14,7 +14,7 @@ public class Producer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); //构建消息集合 diff --git a/rocketmq-demo/src/main/java/transaction/Consumer.java b/rocketmq-demo/src/main/java/transaction/Consumer.java index c88ca9d..2e01495 100644 --- a/rocketmq-demo/src/main/java/transaction/Consumer.java +++ b/rocketmq-demo/src/main/java/transaction/Consumer.java @@ -17,7 +17,7 @@ public class Consumer { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("TransactionTopic", "*"); diff --git a/rocketmq-demo/src/main/java/transaction/Producer.java b/rocketmq-demo/src/main/java/transaction/Producer.java index 6640b74..e079a42 100644 --- a/rocketmq-demo/src/main/java/transaction/Producer.java +++ b/rocketmq-demo/src/main/java/transaction/Producer.java @@ -1,6 +1,7 @@ package transaction; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.producer.*; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; @@ -15,7 +16,7 @@ public class Producer { //1.创建消息生产者producer,并制定生产者组名 TransactionMQProducer producer = new TransactionMQProducer("group5"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //添加事务监听器 producer.setTransactionListener(new TransactionListener() { diff --git a/springboot-rocketmq-consumer/src/main/resources/application.properties b/springboot-rocketmq-consumer/src/main/resources/application.properties index 9151933..0fc21b3 100644 --- a/springboot-rocketmq-consumer/src/main/resources/application.properties +++ b/springboot-rocketmq-consumer/src/main/resources/application.properties @@ -1,2 +1,2 @@ -rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876 +rocketmq.name-server=47.119.161.70:9876;47.119.163.226:9876 rocketmq.producer.group=my-group \ No newline at end of file diff --git a/springboot-rocketmq-producer/src/main/resources/application.properties b/springboot-rocketmq-producer/src/main/resources/application.properties index 9151933..0fc21b3 100644 --- a/springboot-rocketmq-producer/src/main/resources/application.properties +++ b/springboot-rocketmq-producer/src/main/resources/application.properties @@ -1,2 +1,2 @@ -rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876 +rocketmq.name-server=47.119.161.70:9876;47.119.163.226:9876 rocketmq.producer.group=my-group \ No newline at end of file -- GitLab