diff --git a/rocketmq-demo/pom.xml b/rocketmq-demo/pom.xml index 7ed8918f19c57bad340337828f5479682f7d40eb..5554ad33e5500f8e7013d89a4d606ce5bf9271fa 100644 --- a/rocketmq-demo/pom.xml +++ b/rocketmq-demo/pom.xml @@ -12,5 +12,12 @@ 8 8 + + + org.apache.rocketmq + rocketmq-client + 4.8.0 + + \ No newline at end of file diff --git a/rocketmq-demo/shuyu.iml b/rocketmq-demo/shuyu.iml index c035f0b00817c7836efc40bf150e86d04273014e..a945e9026e839d67e75ddb6e84fa774f126494f8 100644 --- a/rocketmq-demo/shuyu.iml +++ b/rocketmq-demo/shuyu.iml @@ -11,5 +11,18 @@ + + + + + + + + + + + + + \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/base/consumer/Consumer.java b/rocketmq-demo/src/main/java/base/consumer/Consumer.java index abde08bd418d70debc56d4b31a3987a9d8f14be4..2ac732bbaed9f264009afd8a194a3906b3378b61 100644 --- a/rocketmq-demo/src/main/java/base/consumer/Consumer.java +++ b/rocketmq-demo/src/main/java/base/consumer/Consumer.java @@ -11,23 +11,23 @@ import java.util.List; /** * 消息的接受者 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 11:45 */ public class Consumer { - public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("base", "*"); - //设定消费模式:负载均衡|广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); - //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { - //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -40,4 +40,4 @@ public class Consumer { //5.启动消费者consumer consumer.start(); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java b/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java index 64977c4ce402884813bb2ad0fe8b57e9717f053a..f5f2a335666a394747025b035d3ea19b959cfd9c 100644 --- a/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java +++ b/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java @@ -11,15 +11,13 @@ import java.util.concurrent.TimeUnit; * 发送异步消息 */ public class AsyncProducer { - public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); - for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** @@ -34,6 +32,7 @@ public class AsyncProducer { * 发送成功回调函数 * @param sendResult */ + @Override public void onSuccess(SendResult sendResult) { System.out.println("发送结果:" + sendResult); } @@ -42,16 +41,15 @@ public class AsyncProducer { * 发送失败回调函数 * @param e */ + @Override public void onException(Throwable e) { System.out.println("发送异常:" + e); } }); - //线程睡1秒 TimeUnit.SECONDS.sleep(1); } - //6.关闭生产者producer producer.shutdown(); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java b/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java index 3caadc8da748061c4cfe95d268f372257497bac1..30c0dce36d93969a2d63a837c50c4bb9db619fc6 100644 --- a/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java +++ b/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java @@ -15,7 +15,7 @@ public class OneWayProducer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); diff --git a/rocketmq-demo/src/main/java/base/producer/SyncProducer.java b/rocketmq-demo/src/main/java/base/producer/SyncProducer.java index b814224be5a8d27adbdbc2515f1b9e07d7e8f408..4ef4c61aa7da746b819e7a757cc5e3de08848b02 100644 --- a/rocketmq-demo/src/main/java/base/producer/SyncProducer.java +++ b/rocketmq-demo/src/main/java/base/producer/SyncProducer.java @@ -16,7 +16,7 @@ public class SyncProducer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); diff --git a/rocketmq-demo/src/main/java/batch/Consumer.java b/rocketmq-demo/src/main/java/batch/Consumer.java index a3a30f1962dfab9d77ebfe62edae914fe0dcb900..4db21fcadfa8acd9527d6bd097a44a161d4a3e42 100644 --- a/rocketmq-demo/src/main/java/batch/Consumer.java +++ b/rocketmq-demo/src/main/java/batch/Consumer.java @@ -13,7 +13,7 @@ public class Consumer { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("BatchTopic", "*"); diff --git a/rocketmq-demo/src/main/java/batch/Producer.java b/rocketmq-demo/src/main/java/batch/Producer.java index 621cd7f809181c6ce4772496f49c7240843d4e7a..cc137344bc3f6e54950ae537d7d4e26c16335993 100644 --- a/rocketmq-demo/src/main/java/batch/Producer.java +++ b/rocketmq-demo/src/main/java/batch/Producer.java @@ -15,7 +15,7 @@ public class Producer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); diff --git a/rocketmq-demo/src/main/java/delay/Consumer.java b/rocketmq-demo/src/main/java/delay/Consumer.java index dca52bb197ecb154d2686eec370ebc4a6c8138ed..7960414643d62a31b05c7c641ca2dbc8614a9fef 100644 --- a/rocketmq-demo/src/main/java/delay/Consumer.java +++ b/rocketmq-demo/src/main/java/delay/Consumer.java @@ -14,7 +14,7 @@ public class Consumer { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("DelayTopic", "*"); diff --git a/rocketmq-demo/src/main/java/delay/Producer.java b/rocketmq-demo/src/main/java/delay/Producer.java index b910030d6ccefccde7894be71b24e012e30a70bb..0275cafd27d49dbc70f4a31a95a7cd300c359fb3 100644 --- a/rocketmq-demo/src/main/java/delay/Producer.java +++ b/rocketmq-demo/src/main/java/delay/Producer.java @@ -16,7 +16,7 @@ public class Producer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); diff --git a/rocketmq-demo/src/main/java/filter/sql/Consumer.java b/rocketmq-demo/src/main/java/filter/sql/Consumer.java index 90db71e26c35c0c87cfd49df2156b4f89870022c..7df5470d7f2cd1a24133407f0e5529781f953bf8 100644 --- a/rocketmq-demo/src/main/java/filter/sql/Consumer.java +++ b/rocketmq-demo/src/main/java/filter/sql/Consumer.java @@ -14,7 +14,7 @@ public class Consumer { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5")); diff --git a/rocketmq-demo/src/main/java/filter/sql/Producer.java b/rocketmq-demo/src/main/java/filter/sql/Producer.java index 36aa7bbc06564aac97ba60ba84714e5ff57240bf..ef1a714eb90759bd1475ca7cd5a6db663db481a3 100644 --- a/rocketmq-demo/src/main/java/filter/sql/Producer.java +++ b/rocketmq-demo/src/main/java/filter/sql/Producer.java @@ -13,7 +13,7 @@ public class Producer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); diff --git a/rocketmq-demo/src/main/java/filter/tag/Consumer.java b/rocketmq-demo/src/main/java/filter/tag/Consumer.java index b1c7d4de51e98ae3ed977002652893f9af46bc16..5180f4e5bc865bb28eb7ee3bb82952de61514fa1 100644 --- a/rocketmq-demo/src/main/java/filter/tag/Consumer.java +++ b/rocketmq-demo/src/main/java/filter/tag/Consumer.java @@ -13,7 +13,7 @@ public class Consumer { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("FilterTagTopic", "Tag1 || Tag2 "); diff --git a/rocketmq-demo/src/main/java/filter/tag/Producer.java b/rocketmq-demo/src/main/java/filter/tag/Producer.java index 6d967d3389eaef3a81598e977f942b43502bf032..6af049e296479de6a8f38472110d04310db40b27 100644 --- a/rocketmq-demo/src/main/java/filter/tag/Producer.java +++ b/rocketmq-demo/src/main/java/filter/tag/Producer.java @@ -13,7 +13,7 @@ public class Producer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); diff --git a/rocketmq-demo/src/main/java/order/Consumer.java b/rocketmq-demo/src/main/java/order/Consumer.java index 4fc9335c3b1b95f0c0966a8cbfa05c3b9a05b7ef..05adca475d21ed7081bb02d3dfc6a145a5e394da 100644 --- a/rocketmq-demo/src/main/java/order/Consumer.java +++ b/rocketmq-demo/src/main/java/order/Consumer.java @@ -14,7 +14,7 @@ public class Consumer { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("OrderTopic", "*"); diff --git a/rocketmq-demo/src/main/java/order/Producer.java b/rocketmq-demo/src/main/java/order/Producer.java index d075a9482f06d49494cb658ae7298cca485628ce..16dcfce773df08f4856d890b93decf810dfe5821 100644 --- a/rocketmq-demo/src/main/java/order/Producer.java +++ b/rocketmq-demo/src/main/java/order/Producer.java @@ -14,7 +14,7 @@ public class Producer { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); //构建消息集合 diff --git a/rocketmq-demo/src/main/java/transaction/Consumer.java b/rocketmq-demo/src/main/java/transaction/Consumer.java index c88ca9d591d9d31248dfcad9ddb3837db9a97e8e..2e014959f02677153ab09943168ac67c82f5f717 100644 --- a/rocketmq-demo/src/main/java/transaction/Consumer.java +++ b/rocketmq-demo/src/main/java/transaction/Consumer.java @@ -17,7 +17,7 @@ public class Consumer { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); //2.指定Nameserver地址 - consumer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("TransactionTopic", "*"); diff --git a/rocketmq-demo/src/main/java/transaction/Producer.java b/rocketmq-demo/src/main/java/transaction/Producer.java index 6640b748325aacfa449e737428d4e27ca613b2e1..e079a42053582abaf5f15d57346a92d8da315c46 100644 --- a/rocketmq-demo/src/main/java/transaction/Producer.java +++ b/rocketmq-demo/src/main/java/transaction/Producer.java @@ -1,6 +1,7 @@ package transaction; import org.apache.commons.lang3.StringUtils; +import org.apache.rocketmq.client.producer.*; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; @@ -15,7 +16,7 @@ public class Producer { //1.创建消息生产者producer,并制定生产者组名 TransactionMQProducer producer = new TransactionMQProducer("group5"); //2.指定Nameserver地址 - producer.setNamesrvAddr("192.168.25.135:9876;192.168.25.138:9876"); + producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //添加事务监听器 producer.setTransactionListener(new TransactionListener() { diff --git a/springboot-rocketmq-consumer/src/main/resources/application.properties b/springboot-rocketmq-consumer/src/main/resources/application.properties index 915193351dce7e258ee7fd3929f8fea2bd6b926f..0fc21b3b9bc721dd1d678c0a14af83eba74dc8cf 100644 --- a/springboot-rocketmq-consumer/src/main/resources/application.properties +++ b/springboot-rocketmq-consumer/src/main/resources/application.properties @@ -1,2 +1,2 @@ -rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876 +rocketmq.name-server=47.119.161.70:9876;47.119.163.226:9876 rocketmq.producer.group=my-group \ No newline at end of file diff --git a/springboot-rocketmq-producer/src/main/resources/application.properties b/springboot-rocketmq-producer/src/main/resources/application.properties index 915193351dce7e258ee7fd3929f8fea2bd6b926f..0fc21b3b9bc721dd1d678c0a14af83eba74dc8cf 100644 --- a/springboot-rocketmq-producer/src/main/resources/application.properties +++ b/springboot-rocketmq-producer/src/main/resources/application.properties @@ -1,2 +1,2 @@ -rocketmq.name-server=192.168.25.135:9876;192.168.25.138:9876 +rocketmq.name-server=47.119.161.70:9876;47.119.163.226:9876 rocketmq.producer.group=my-group \ No newline at end of file