From 261fb5ee20b6dacca1072d64664f006da34d07c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A7=A6=E8=8B=B1=E6=9D=B0?= <327782001@qq.com> Date: Fri, 26 May 2023 14:57:56 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E5=8F=91=E9=80=81=E5=90=84=E7=A7=8D?= =?UTF-8?q?=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/batch/Consumer.java | 12 +++++--- .../src/main/java/batch/Producer.java | 22 ++++++-------- .../src/main/java/delay/Consumer.java | 29 +++++++++---------- .../src/main/java/delay/Producer.java | 15 +++++----- .../src/main/java/filter/sql/Consumer.java | 4 +-- .../src/main/java/filter/sql/Producer.java | 10 +------ .../src/main/java/filter/tag/Consumer.java | 4 +-- .../src/main/java/filter/tag/Producer.java | 8 +---- .../src/main/java/order/Consumer.java | 15 ++++++---- .../src/main/java/order/OrderStep.java | 13 +-------- .../src/main/java/order/Producer.java | 12 +++++--- .../src/main/java/transaction/Consumer.java | 26 +++++++---------- .../src/main/java/transaction/Producer.java | 16 ++++------ 13 files changed, 76 insertions(+), 110 deletions(-) diff --git a/rocketmq-demo/src/main/java/batch/Consumer.java b/rocketmq-demo/src/main/java/batch/Consumer.java index 4db21fc..0202c90 100644 --- a/rocketmq-demo/src/main/java/batch/Consumer.java +++ b/rocketmq-demo/src/main/java/batch/Consumer.java @@ -8,6 +8,13 @@ import org.apache.rocketmq.common.message.MessageExt; import java.util.List; +/** + * 消费消息 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:49 + */ public class Consumer { public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 @@ -16,10 +23,8 @@ public class Consumer { consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("BatchTopic", "*"); - //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { - //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -31,7 +36,6 @@ public class Consumer { }); //5.启动消费者consumer consumer.start(); - System.out.println("消费者启动"); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/batch/Producer.java b/rocketmq-demo/src/main/java/batch/Producer.java index cc13734..b5ba1c3 100644 --- a/rocketmq-demo/src/main/java/batch/Producer.java +++ b/rocketmq-demo/src/main/java/batch/Producer.java @@ -9,8 +9,15 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -public class Producer { +/** + * 批量发送消息 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:49 + */ +public class Producer { public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); @@ -18,11 +25,7 @@ public class Producer { producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); - - List msgs = new ArrayList(); - - //4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic @@ -32,24 +35,17 @@ public class Producer { Message msg1 = new Message("BatchTopic", "Tag1", ("Hello World" + 1).getBytes()); Message msg2 = new Message("BatchTopic", "Tag1", ("Hello World" + 2).getBytes()); Message msg3 = new Message("BatchTopic", "Tag1", ("Hello World" + 3).getBytes()); - msgs.add(msg1); msgs.add(msg2); msgs.add(msg3); - //5.发送消息 SendResult result = producer.send(msgs); //发送状态 SendStatus status = result.getSendStatus(); - System.out.println("发送结果:" + result); - //线程睡1秒 TimeUnit.SECONDS.sleep(1); - - //6.关闭生产者producer producer.shutdown(); } - -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/delay/Consumer.java b/rocketmq-demo/src/main/java/delay/Consumer.java index 7960414..5ffa5b7 100644 --- a/rocketmq-demo/src/main/java/delay/Consumer.java +++ b/rocketmq-demo/src/main/java/delay/Consumer.java @@ -1,15 +1,18 @@ package delay; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; -import java.util.List; - +/** + * + * 延迟消息消费 + *@version : 2.2.0 + *@author : qinyingjie + *@date : 2023/5/26 14:46 + */ public class Consumer { - public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); @@ -17,22 +20,16 @@ public class Consumer { consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("DelayTopic", "*"); - //4.设置回调函数,处理消息 - consumer.registerMessageListener(new MessageListenerConcurrently() { - - //接受消息内容 - @Override - public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - for (MessageExt msg : msgs) { - System.out.println("消息ID:【" + msg.getMsgId() + "】,延迟时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp())); - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + //接受消息内容 + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + for (MessageExt msg : msgs) { + System.out.println("消息ID:【" + msg.getMsgId() + "】,延迟时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp())); } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); //5.启动消费者consumer consumer.start(); - System.out.println("消费者启动"); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/delay/Producer.java b/rocketmq-demo/src/main/java/delay/Producer.java index 0275caf..7ae4818 100644 --- a/rocketmq-demo/src/main/java/delay/Producer.java +++ b/rocketmq-demo/src/main/java/delay/Producer.java @@ -10,8 +10,14 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import java.util.concurrent.TimeUnit; +/** + * 延迟消息生产 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:46 + */ public class Producer { - public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); @@ -19,7 +25,6 @@ public class Producer { producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); - for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** @@ -34,15 +39,11 @@ public class Producer { SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); - System.out.println("发送结果:" + result); - //线程睡1秒 TimeUnit.SECONDS.sleep(1); } - //6.关闭生产者producer producer.shutdown(); } - -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/filter/sql/Consumer.java b/rocketmq-demo/src/main/java/filter/sql/Consumer.java index 7df5470..aed5cee 100644 --- a/rocketmq-demo/src/main/java/filter/sql/Consumer.java +++ b/rocketmq-demo/src/main/java/filter/sql/Consumer.java @@ -17,10 +17,8 @@ public class Consumer { consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5")); - //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { - //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -34,4 +32,4 @@ public class Consumer { consumer.start(); System.out.println("消费者启动"); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/filter/sql/Producer.java b/rocketmq-demo/src/main/java/filter/sql/Producer.java index ef1a714..d8c2ff8 100644 --- a/rocketmq-demo/src/main/java/filter/sql/Producer.java +++ b/rocketmq-demo/src/main/java/filter/sql/Producer.java @@ -8,7 +8,6 @@ import org.apache.rocketmq.common.message.Message; import java.util.concurrent.TimeUnit; public class Producer { - public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); @@ -16,7 +15,6 @@ public class Producer { producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); - for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** @@ -25,22 +23,16 @@ public class Producer { * 参数三:消息内容 */ Message msg = new Message("FilterSQLTopic", "Tag1", ("Hello World" + i).getBytes()); - msg.putUserProperty("i", String.valueOf(i)); - //5.发送消息 SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); - System.out.println("发送结果:" + result); - //线程睡1秒 TimeUnit.SECONDS.sleep(2); } - //6.关闭生产者producer producer.shutdown(); } - -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/filter/tag/Consumer.java b/rocketmq-demo/src/main/java/filter/tag/Consumer.java index 5180f4e..3c6f104 100644 --- a/rocketmq-demo/src/main/java/filter/tag/Consumer.java +++ b/rocketmq-demo/src/main/java/filter/tag/Consumer.java @@ -16,10 +16,8 @@ public class Consumer { consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("FilterTagTopic", "Tag1 || Tag2 "); - //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { - //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -33,4 +31,4 @@ public class Consumer { consumer.start(); System.out.println("消费者启动"); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/filter/tag/Producer.java b/rocketmq-demo/src/main/java/filter/tag/Producer.java index 6af049e..2e0aa3d 100644 --- a/rocketmq-demo/src/main/java/filter/tag/Producer.java +++ b/rocketmq-demo/src/main/java/filter/tag/Producer.java @@ -8,7 +8,6 @@ import org.apache.rocketmq.common.message.Message; import java.util.concurrent.TimeUnit; public class Producer { - public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); @@ -16,7 +15,6 @@ public class Producer { producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); - for (int i = 0; i < 3; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** @@ -29,15 +27,11 @@ public class Producer { SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); - System.out.println("发送结果:" + result); - //线程睡1秒 TimeUnit.SECONDS.sleep(1); } - //6.关闭生产者producer producer.shutdown(); } - -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/order/Consumer.java b/rocketmq-demo/src/main/java/order/Consumer.java index 05adca4..2a380da 100644 --- a/rocketmq-demo/src/main/java/order/Consumer.java +++ b/rocketmq-demo/src/main/java/order/Consumer.java @@ -9,6 +9,14 @@ import org.apache.rocketmq.common.message.MessageExt; import java.util.List; + +/** + * 顺序消费 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:40 + */ public class Consumer { public static void main(String[] args) throws MQClientException { //1.创建消费者Consumer,制定消费者组名 @@ -17,10 +25,8 @@ public class Consumer { consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("OrderTopic", "*"); - //4.注册消息监听器 consumer.registerMessageListener(new MessageListenerOrderly() { - @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { @@ -29,11 +35,8 @@ public class Consumer { return ConsumeOrderlyStatus.SUCCESS; } }); - //5.启动消费者 consumer.start(); - System.out.println("消费者启动"); - } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/order/OrderStep.java b/rocketmq-demo/src/main/java/order/OrderStep.java index c5f1213..acd251d 100644 --- a/rocketmq-demo/src/main/java/order/OrderStep.java +++ b/rocketmq-demo/src/main/java/order/OrderStep.java @@ -39,57 +39,46 @@ public class OrderStep { // 1065L : 创建 付款 // 7235L :创建 付款 List orderList = new ArrayList(); - OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("创建"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("创建"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("付款"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("创建"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("付款"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("付款"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("完成"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("推送"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("完成"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("完成"); orderList.add(orderDemo); - return orderList; } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/order/Producer.java b/rocketmq-demo/src/main/java/order/Producer.java index 16dcfce..1afa895 100644 --- a/rocketmq-demo/src/main/java/order/Producer.java +++ b/rocketmq-demo/src/main/java/order/Producer.java @@ -8,8 +8,14 @@ import org.apache.rocketmq.common.message.MessageQueue; import java.util.List; +/** + * 顺序消息生产者 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:42 + */ public class Producer { - public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); @@ -43,10 +49,8 @@ public class Producer { return mqs.get((int) index); } }, orderSteps.get(i).getOrderId()); - System.out.println("发送结果:" + sendResult); } producer.shutdown(); } - -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/transaction/Consumer.java b/rocketmq-demo/src/main/java/transaction/Consumer.java index 2e01495..9f988bd 100644 --- a/rocketmq-demo/src/main/java/transaction/Consumer.java +++ b/rocketmq-demo/src/main/java/transaction/Consumer.java @@ -1,18 +1,18 @@ package transaction; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; -import java.util.List; - /** * 消息的接受者 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:57 */ public class Consumer { - public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); @@ -20,22 +20,16 @@ public class Consumer { consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("TransactionTopic", "*"); - - //4.设置回调函数,处理消息 - consumer.registerMessageListener(new MessageListenerConcurrently() { - - //接受消息内容 - @Override - public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - for (MessageExt msg : msgs) { - System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + //接受消息内容 + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + for (MessageExt msg : msgs) { + System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); //5.启动消费者consumer consumer.start(); System.out.println("生产者启动"); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/transaction/Producer.java b/rocketmq-demo/src/main/java/transaction/Producer.java index e079a42..a608fa1 100644 --- a/rocketmq-demo/src/main/java/transaction/Producer.java +++ b/rocketmq-demo/src/main/java/transaction/Producer.java @@ -8,16 +8,18 @@ import org.apache.rocketmq.common.message.MessageExt; import java.util.concurrent.TimeUnit; /** - * 发送同步消息 + * 发送事务消息 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:57 */ public class Producer { - public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 TransactionMQProducer producer = new TransactionMQProducer("group5"); //2.指定Nameserver地址 producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); - //添加事务监听器 producer.setTransactionListener(new TransactionListener() { /** @@ -49,12 +51,9 @@ public class Producer { return LocalTransactionState.COMMIT_MESSAGE; } }); - //3.启动producer producer.start(); - String[] tags = {"TAGA", "TAGB", "TAGC"}; - for (int i = 0; i < 3; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** @@ -67,14 +66,11 @@ public class Producer { SendResult result = producer.sendMessageInTransaction(msg, null); //发送状态 SendStatus status = result.getSendStatus(); - System.out.println("发送结果:" + result); - //线程睡1秒 TimeUnit.SECONDS.sleep(2); } - //6.关闭生产者producer //producer.shutdown(); } -} +} \ No newline at end of file -- GitLab