diff --git a/rocketmq-demo/src/main/java/base/consumer/Consumer.java b/rocketmq-demo/src/main/java/base/consumer/Consumer.java index 9cb3606c738c2b28344765f86963638b53a28ade..97c9a82b14d349a1e293f5d1e2b53c7fdb3d8dff 100644 --- a/rocketmq-demo/src/main/java/base/consumer/Consumer.java +++ b/rocketmq-demo/src/main/java/base/consumer/Consumer.java @@ -1,14 +1,11 @@ package base.consumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; -import java.util.List; - /** * 消息的接受者 * @@ -28,15 +25,12 @@ public class Consumer { //设定消费模式:负载均衡|广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); //4.设置回调函数,处理消息 - consumer.registerMessageListener(new MessageListenerConcurrently() { - //接受消息内容 - @Override - public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) { - for (MessageExt msg : msgs) { - System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; + //接受消息内容 + consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { + for (MessageExt msg : msgs) { + System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); } + return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); //5.启动消费者consumer consumer.start(); diff --git a/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java b/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java index e9d348b360ebd12c1ccb8a1092a6620178144dcf..2c54e77cf5dc735d4e143b68e914d9afd5ec2e4c 100644 --- a/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java +++ b/rocketmq-demo/src/main/java/base/producer/AsyncProducer.java @@ -7,8 +7,13 @@ import org.apache.rocketmq.common.message.Message; import java.util.concurrent.TimeUnit; + /** * 发送异步消息 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:17 */ public class AsyncProducer { public static void main(String[] args) throws Exception { diff --git a/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java b/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java index d8903694e0079e2ac9860f0d84505db83e729259..a821da59aff098d923d30c8366c00e860301d20c 100644 --- a/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java +++ b/rocketmq-demo/src/main/java/base/producer/OneWayProducer.java @@ -1,10 +1,18 @@ package base.producer; + import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; + import java.util.concurrent.TimeUnit; + + /** * 发送单向消息 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:18 */ public class OneWayProducer { public static void main(String[] args) throws Exception, MQBrokerException { diff --git a/rocketmq-demo/src/main/java/base/producer/SyncProducer.java b/rocketmq-demo/src/main/java/base/producer/SyncProducer.java index f44a46e2b7db6ce52c72565cd48061dacd83c807..395696cc580cbb0659ed3990c3187edef3d077f0 100644 --- a/rocketmq-demo/src/main/java/base/producer/SyncProducer.java +++ b/rocketmq-demo/src/main/java/base/producer/SyncProducer.java @@ -9,9 +9,12 @@ import java.util.concurrent.TimeUnit; /** * 发送同步消息 + * + * @author : qinyingjie + * @version : 2.2.0 + * @date : 2023/5/26 14:18 */ public class SyncProducer { - public static void main(String[] args) throws Exception { //1.创建消息生产者producer,并制定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("group1"); @@ -20,7 +23,6 @@ public class SyncProducer { producer.setSendMessageWithVIPChannel(false); //3.启动producer producer.start(); - for (int i = 0; i < 10; i++) { //4.创建消息对象,指定主题Topic、Tag和消息体 /** @@ -33,14 +35,11 @@ public class SyncProducer { SendResult result = producer.send(msg); //发送状态 SendStatus status = result.getSendStatus(); - System.out.println("发送结果:" + result); - //线程睡1秒 TimeUnit.SECONDS.sleep(1); } - //6.关闭生产者producer producer.shutdown(); } -} +} \ No newline at end of file