diff --git a/rocketmq-demo/src/main/java/batch/Consumer.java b/rocketmq-demo/src/main/java/batch/Consumer.java index 4db21fcadfa8acd9527d6bd097a44a161d4a3e42..0202c90accc7b3fcdde9bf66c4dda3cc490fd009 100644 --- a/rocketmq-demo/src/main/java/batch/Consumer.java +++ b/rocketmq-demo/src/main/java/batch/Consumer.java @@ -8,6 +8,13 @@ import org.apache.rocketmq.common.message.MessageExt; import java.util.List; +/** + * 消费消息 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:49 + */ public class Consumer { public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 @@ -16,10 +23,8 @@ public class Consumer { consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("BatchTopic", "*"); - //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { - //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -31,7 +36,6 @@ public class Consumer { }); //5.启动消费者consumer consumer.start(); - System.out.println("消费者启动"); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/batch/Producer.java b/rocketmq-demo/src/main/java/batch/Producer.java index cc137344bc3f6e54950ae537d7d4e26c16335993..b5ba1c34c316d5277dbc80d42169a365e0d86f02 100644 --- a/rocketmq-demo/src/main/java/batch/Producer.java +++ b/rocketmq-demo/src/main/java/batch/Producer.java @@ -9,8 +9,15 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; -public class Producer { +/** + * 批量发送消息 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:49 + */ +public class Producer { public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); @@ -18,11 +25,7 @@ public class Producer { producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); - - List msgs = new ArrayList(); - - //4.创建消息对象,指定主题Topic、Tag和消息体 /** * 参数一:消息主题Topic @@ -32,24 +35,17 @@ public class Producer { Message msg1 = new Message("BatchTopic", "Tag1", ("Hello World" + 1).getBytes()); Message msg2 = new Message("BatchTopic", "Tag1", ("Hello World" + 2).getBytes()); Message msg3 = new Message("BatchTopic", "Tag1", ("Hello World" + 3).getBytes()); - msgs.add(msg1); msgs.add(msg2); msgs.add(msg3); - //5.发送消息 SendResult result = producer.send(msgs); //发送状态 SendStatus status = result.getSendStatus(); - System.out.println("发送结果:" + result); - //线程睡1秒 TimeUnit.SECONDS.sleep(1); - - //6.关闭生产者producer producer.shutdown(); } - -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/delay/Consumer.java b/rocketmq-demo/src/main/java/delay/Consumer.java index 7960414643d62a31b05c7c641ca2dbc8614a9fef..5ffa5b79a770012da663bdeae8e04fe212c1e24f 100644 --- a/rocketmq-demo/src/main/java/delay/Consumer.java +++ b/rocketmq-demo/src/main/java/delay/Consumer.java @@ -1,15 +1,18 @@ package delay; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; -import java.util.List; - +/** + * + * 延迟消息消费 + *@version : 2.2.0 + *@author : qinyingjie + *@date : 2023/5/26 14:46 + */ public class Consumer { - public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); @@ -17,22 +20,16 @@ public class Consumer { consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("DelayTopic", "*"); - //4.设置回调函数,处理消息 - consumer.registerMessageListener(new MessageListenerConcurrently() { - - //接受消息内容 - @Override - public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - for (MessageExt msg : msgs) { - System.out.println("消息ID:【" + msg.getMsgId() + "】,延迟时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp())); - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + //接受消息内容 + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + for (MessageExt msg : msgs) { + System.out.println("消息ID:【" + msg.getMsgId() + "】,延迟时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp())); } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); //5.启动消费者consumer consumer.start(); - System.out.println("消费者启动"); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/delay/Producer.java b/rocketmq-demo/src/main/java/delay/Producer.java index 0275cafd27d49dbc70f4a31a95a7cd300c359fb3..7ae4818bf2fad36a49914fe9c1156cdfce0ed178 100644 --- a/rocketmq-demo/src/main/java/delay/Producer.java +++ b/rocketmq-demo/src/main/java/delay/Producer.java @@ -10,8 +10,14 @@ import org.apache.rocketmq.remoting.exception.RemotingException; import java.util.concurrent.TimeUnit; +/** + * 延迟消息生产 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:46 + */ public class Producer { - public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); @@ -19,7 +25,6 @@ public class Producer { producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); - for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** @@ -34,15 +39,11 @@ public class Producer { SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); - System.out.println("发送结果:" + result); - //线程睡1秒 TimeUnit.SECONDS.sleep(1); } - //6.关闭生产者producer producer.shutdown(); } - -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/filter/sql/Consumer.java b/rocketmq-demo/src/main/java/filter/sql/Consumer.java index 7df5470d7f2cd1a24133407f0e5529781f953bf8..aed5cee021be947a17f3c652a3b2a6abb8d4cc33 100644 --- a/rocketmq-demo/src/main/java/filter/sql/Consumer.java +++ b/rocketmq-demo/src/main/java/filter/sql/Consumer.java @@ -17,10 +17,8 @@ public class Consumer { consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5")); - //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { - //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -34,4 +32,4 @@ public class Consumer { consumer.start(); System.out.println("消费者启动"); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/filter/sql/Producer.java b/rocketmq-demo/src/main/java/filter/sql/Producer.java index ef1a714eb90759bd1475ca7cd5a6db663db481a3..d8c2ff8489bc25f757f8612c81e816a63436f74f 100644 --- a/rocketmq-demo/src/main/java/filter/sql/Producer.java +++ b/rocketmq-demo/src/main/java/filter/sql/Producer.java @@ -8,7 +8,6 @@ import org.apache.rocketmq.common.message.Message; import java.util.concurrent.TimeUnit; public class Producer { - public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); @@ -16,7 +15,6 @@ public class Producer { producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); - for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** @@ -25,22 +23,16 @@ public class Producer { * 参数三:消息内容 */ Message msg = new Message("FilterSQLTopic", "Tag1", ("Hello World" + i).getBytes()); - msg.putUserProperty("i", String.valueOf(i)); - //5.发送消息 SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); - System.out.println("发送结果:" + result); - //线程睡1秒 TimeUnit.SECONDS.sleep(2); } - //6.关闭生产者producer producer.shutdown(); } - -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/filter/tag/Consumer.java b/rocketmq-demo/src/main/java/filter/tag/Consumer.java index 5180f4e5bc865bb28eb7ee3bb82952de61514fa1..3c6f104426deb7bca0acf15fb93bfc3b698dc766 100644 --- a/rocketmq-demo/src/main/java/filter/tag/Consumer.java +++ b/rocketmq-demo/src/main/java/filter/tag/Consumer.java @@ -16,10 +16,8 @@ public class Consumer { consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("FilterTagTopic", "Tag1 || Tag2 "); - //4.设置回调函数,处理消息 consumer.registerMessageListener(new MessageListenerConcurrently() { - //接受消息内容 @Override public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { @@ -33,4 +31,4 @@ public class Consumer { consumer.start(); System.out.println("消费者启动"); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/filter/tag/Producer.java b/rocketmq-demo/src/main/java/filter/tag/Producer.java index 6af049e296479de6a8f38472110d04310db40b27..2e0aa3d7174a120e02b1407c44a4053e24173dde 100644 --- a/rocketmq-demo/src/main/java/filter/tag/Producer.java +++ b/rocketmq-demo/src/main/java/filter/tag/Producer.java @@ -8,7 +8,6 @@ import org.apache.rocketmq.common.message.Message; import java.util.concurrent.TimeUnit; public class Producer { - public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); @@ -16,7 +15,6 @@ public class Producer { producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.启动producer producer.start(); - for (int i = 0; i < 3; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** @@ -29,15 +27,11 @@ public class Producer { SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); - System.out.println("发送结果:" + result); - //线程睡1秒 TimeUnit.SECONDS.sleep(1); } - //6.关闭生产者producer producer.shutdown(); } - -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/order/Consumer.java b/rocketmq-demo/src/main/java/order/Consumer.java index 05adca475d21ed7081bb02d3dfc6a145a5e394da..2a380dae019e9dd86cbc3e8f4994f0ba7aa51e8f 100644 --- a/rocketmq-demo/src/main/java/order/Consumer.java +++ b/rocketmq-demo/src/main/java/order/Consumer.java @@ -9,6 +9,14 @@ import org.apache.rocketmq.common.message.MessageExt; import java.util.List; + +/** + * 顺序消费 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:40 + */ public class Consumer { public static void main(String[] args) throws MQClientException { //1.创建消费者Consumer,制定消费者组名 @@ -17,10 +25,8 @@ public class Consumer { consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("OrderTopic", "*"); - //4.注册消息监听器 consumer.registerMessageListener(new MessageListenerOrderly() { - @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { @@ -29,11 +35,8 @@ public class Consumer { return ConsumeOrderlyStatus.SUCCESS; } }); - //5.启动消费者 consumer.start(); - System.out.println("消费者启动"); - } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/order/OrderStep.java b/rocketmq-demo/src/main/java/order/OrderStep.java index c5f121366e0ec9f27258936152af86515c239f03..acd251de5264f19f14f43c4958bf5816c315219c 100644 --- a/rocketmq-demo/src/main/java/order/OrderStep.java +++ b/rocketmq-demo/src/main/java/order/OrderStep.java @@ -39,57 +39,46 @@ public class OrderStep { // 1065L : 创建 付款 // 7235L :创建 付款 List orderList = new ArrayList(); - OrderStep orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("创建"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("创建"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("付款"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("创建"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("付款"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("付款"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(1065L); orderDemo.setDesc("完成"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("推送"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(7235L); orderDemo.setDesc("完成"); orderList.add(orderDemo); - orderDemo = new OrderStep(); orderDemo.setOrderId(1039L); orderDemo.setDesc("完成"); orderList.add(orderDemo); - return orderList; } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/order/Producer.java b/rocketmq-demo/src/main/java/order/Producer.java index 16dcfce773df08f4856d890b93decf810dfe5821..1afa895aacabf3d17b26000d4174c83255b6635e 100644 --- a/rocketmq-demo/src/main/java/order/Producer.java +++ b/rocketmq-demo/src/main/java/order/Producer.java @@ -8,8 +8,14 @@ import org.apache.rocketmq.common.message.MessageQueue; import java.util.List; +/** + * 顺序消息生产者 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:42 + */ public class Producer { - public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); @@ -43,10 +49,8 @@ public class Producer { return mqs.get((int) index); } }, orderSteps.get(i).getOrderId()); - System.out.println("发送结果:" + sendResult); } producer.shutdown(); } - -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/transaction/Consumer.java b/rocketmq-demo/src/main/java/transaction/Consumer.java index 2e014959f02677153ab09943168ac67c82f5f717..9f988bd1e46296aaf69727a3cefed632b2dd0579 100644 --- a/rocketmq-demo/src/main/java/transaction/Consumer.java +++ b/rocketmq-demo/src/main/java/transaction/Consumer.java @@ -1,18 +1,18 @@ package transaction; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; -import java.util.List; - /** * 消息的接受者 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:57 */ public class Consumer { - public static void main(String[] args) throws Exception { //1.创建消费者Consumer,制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); @@ -20,22 +20,16 @@ public class Consumer { consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); //3.订阅主题Topic和Tag consumer.subscribe("TransactionTopic", "*"); - - //4.设置回调函数,处理消息 - consumer.registerMessageListener(new MessageListenerConcurrently() { - - //接受消息内容 - @Override - public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - for (MessageExt msg : msgs) { - System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + //接受消息内容 + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + for (MessageExt msg : msgs) { + System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); //5.启动消费者consumer consumer.start(); System.out.println("生产者启动"); } -} +} \ No newline at end of file diff --git a/rocketmq-demo/src/main/java/transaction/Producer.java b/rocketmq-demo/src/main/java/transaction/Producer.java index e079a42053582abaf5f15d57346a92d8da315c46..a608fa10905fbbf116e2751893fdb9d55e86f089 100644 --- a/rocketmq-demo/src/main/java/transaction/Producer.java +++ b/rocketmq-demo/src/main/java/transaction/Producer.java @@ -8,16 +8,18 @@ import org.apache.rocketmq.common.message.MessageExt; import java.util.concurrent.TimeUnit; /** - * 发送同步消息 + * 发送事务消息 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:57 */ public class Producer { - public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 TransactionMQProducer producer = new TransactionMQProducer("group5"); //2.指定Nameserver地址 producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); - //添加事务监听器 producer.setTransactionListener(new TransactionListener() { /** @@ -49,12 +51,9 @@ public class Producer { return LocalTransactionState.COMMIT_MESSAGE; } }); - //3.启动producer producer.start(); - String[] tags = {"TAGA", "TAGB", "TAGC"}; - for (int i = 0; i < 3; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** @@ -67,14 +66,11 @@ public class Producer { SendResult result = producer.sendMessageInTransaction(msg, null); //发送状态 SendStatus status = result.getSendStatus(); - System.out.println("发送结果:" + result); - //线程睡1秒 TimeUnit.SECONDS.sleep(2); } - //6.关闭生产者producer //producer.shutdown(); } -} +} \ No newline at end of file