From 264aa2d9ea5db89f796dd2e3aba463db68b30822 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A7=A6=E8=8B=B1=E6=9D=B0?= <327782001@qq.com> Date: Fri, 26 May 2023 14:19:11 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E6=9C=80=E5=9F=BA=E7=A1=80=E7=9A=84?= =?UTF-8?q?=E5=8F=91=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/base/consumer/Consumer.java | 16 +++++----------- .../main/java/base/producer/AsyncProducer.java | 5 +++++ .../main/java/base/producer/OneWayProducer.java | 8 ++++++++ .../main/java/base/producer/SyncProducer.java | 11 +++++------ 4 files changed, 23 insertions(+), 17 deletions(-) diff --git a/rocketmq-demo/src/main/java/base/consumer/Consumer.java b/rocketmq-demo/src/main/java/base/consumer/Consumer.java index 9cb3606..97c9a82 100644 --- a/rocketmq-demo/src/main/java/base/consumer/Consumer.java +++ b/rocketmq-demo/src/main/java/base/consumer/Consumer.java @@ -1,14 +1,11 @@ package base.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import java.util.List; - /** * 消息的接受者 * @@ -28,15 +25,12 @@ public class Consumer { //设定消费模式:负载均衡|广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); //4.设置回调函数,处理消息 - consumer.registerMessageListener(new MessageListenerConcurrently() { - //接受消息内容 - @Override - public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - for (MessageExt msg : msgs) { - System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + //接受消息内容 + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + for (MessageExt msg : msgs) { + System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); //5.启动消费者consumer consumer.start(); diff --git a/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java b/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java index e9d348b..2c54e77 100644 --- a/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java +++ b/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java @@ -7,8 +7,13 @@ import org.apache.rocketmq.common.message.Message; import java.util.concurrent.TimeUnit; + /** * 发送异步消息 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:17 */ public class AsyncProducer { public static void main(String[] args) throws Exception { diff --git a/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java b/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java index d890369..a821da5 100644 --- a/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java +++ b/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java @@ -1,10 +1,18 @@ package base.producer; + import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; + import java.util.concurrent.TimeUnit; + + /** * 发送单向消息 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:18 */ public class OneWayProducer { public static void main(String[] args) throws Exception, MQBrokerException { diff --git a/rocketmq-demo/src/main/java/base/producer/SyncProducer.java b/rocketmq-demo/src/main/java/base/producer/SyncProducer.java index f44a46e..395696c 100644 --- a/rocketmq-demo/src/main/java/base/producer/SyncProducer.java +++ b/rocketmq-demo/src/main/java/base/producer/SyncProducer.java @@ -9,9 +9,12 @@ import java.util.concurrent.TimeUnit; /** * 发送同步消息 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:18 */ public class SyncProducer { - public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); @@ -20,7 +23,6 @@ public class SyncProducer { producer.setSendMessageWithVIPChannel(false); //3.启动producer producer.start(); - for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** @@ -33,14 +35,11 @@ public class SyncProducer { SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); - System.out.println("发送结果:" + result); - //线程睡1秒 TimeUnit.SECONDS.sleep(1); } - //6.关闭生产者producer producer.shutdown(); } -} +} \ No newline at end of file -- GitLab