提交 261fb5ee 编写于 作者: 檀越@新空间's avatar 檀越@新空间 🐭

fix:发送各种消息

上级 1642db01
...@@ -8,6 +8,13 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -8,6 +8,13 @@ import org.apache.rocketmq.common.message.MessageExt;
import java.util.List; import java.util.List;
/**
* 消费消息
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/26 14:49
*/
public class Consumer { public class Consumer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名 //1.创建消费者Consumer,制定消费者组名
...@@ -16,10 +23,8 @@ public class Consumer { ...@@ -16,10 +23,8 @@ public class Consumer {
consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.订阅主题Topic和Tag //3.订阅主题Topic和Tag
consumer.subscribe("BatchTopic", "*"); consumer.subscribe("BatchTopic", "*");
//4.设置回调函数,处理消息 //4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容 //接受消息内容
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
...@@ -31,7 +36,6 @@ public class Consumer { ...@@ -31,7 +36,6 @@ public class Consumer {
}); });
//5.启动消费者consumer //5.启动消费者consumer
consumer.start(); consumer.start();
System.out.println("消费者启动"); System.out.println("消费者启动");
} }
} }
\ No newline at end of file
...@@ -9,8 +9,15 @@ import java.util.ArrayList; ...@@ -9,8 +9,15 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class Producer {
/**
* 批量发送消息
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/26 14:49
*/
public class Producer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
...@@ -18,11 +25,7 @@ public class Producer { ...@@ -18,11 +25,7 @@ public class Producer {
producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.启动producer //3.启动producer
producer.start(); producer.start();
List<Message> msgs = new ArrayList<Message>(); List<Message> msgs = new ArrayList<Message>();
//4.创建消息对象,指定主题Topic、Tag和消息体 //4.创建消息对象,指定主题Topic、Tag和消息体
/** /**
* 参数一:消息主题Topic * 参数一:消息主题Topic
...@@ -32,24 +35,17 @@ public class Producer { ...@@ -32,24 +35,17 @@ public class Producer {
Message msg1 = new Message("BatchTopic", "Tag1", ("Hello World" + 1).getBytes()); Message msg1 = new Message("BatchTopic", "Tag1", ("Hello World" + 1).getBytes());
Message msg2 = new Message("BatchTopic", "Tag1", ("Hello World" + 2).getBytes()); Message msg2 = new Message("BatchTopic", "Tag1", ("Hello World" + 2).getBytes());
Message msg3 = new Message("BatchTopic", "Tag1", ("Hello World" + 3).getBytes()); Message msg3 = new Message("BatchTopic", "Tag1", ("Hello World" + 3).getBytes());
msgs.add(msg1); msgs.add(msg1);
msgs.add(msg2); msgs.add(msg2);
msgs.add(msg3); msgs.add(msg3);
//5.发送消息 //5.发送消息
SendResult result = producer.send(msgs); SendResult result = producer.send(msgs);
//发送状态 //发送状态
SendStatus status = result.getSendStatus(); SendStatus status = result.getSendStatus();
System.out.println("发送结果:" + result); System.out.println("发送结果:" + result);
//线程睡1秒 //线程睡1秒
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
//6.关闭生产者producer //6.关闭生产者producer
producer.shutdown(); producer.shutdown();
} }
}
} \ No newline at end of file
package delay; package delay;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import java.util.List; /**
*
* 延迟消息消费
*@version : 2.2.0
*@author : qinyingjie
*@date : 2023/5/26 14:46
*/
public class Consumer { public class Consumer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名 //1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
...@@ -17,22 +20,16 @@ public class Consumer { ...@@ -17,22 +20,16 @@ public class Consumer {
consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.订阅主题Topic和Tag //3.订阅主题Topic和Tag
consumer.subscribe("DelayTopic", "*"); consumer.subscribe("DelayTopic", "*");
//4.设置回调函数,处理消息 //4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//接受消息内容 for (MessageExt msg : msgs) {
@Override System.out.println("消息ID:【" + msg.getMsgId() + "】,延迟时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp()));
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消息ID:【" + msg.getMsgId() + "】,延迟时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} }
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}); });
//5.启动消费者consumer //5.启动消费者consumer
consumer.start(); consumer.start();
System.out.println("消费者启动"); System.out.println("消费者启动");
} }
} }
\ No newline at end of file
...@@ -10,8 +10,14 @@ import org.apache.rocketmq.remoting.exception.RemotingException; ...@@ -10,8 +10,14 @@ import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/**
* 延迟消息生产
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/26 14:46
*/
public class Producer { public class Producer {
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
...@@ -19,7 +25,6 @@ public class Producer { ...@@ -19,7 +25,6 @@ public class Producer {
producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.启动producer //3.启动producer
producer.start(); producer.start();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体 //4.创建消息对象,指定主题Topic、Tag和消息体
/** /**
...@@ -34,15 +39,11 @@ public class Producer { ...@@ -34,15 +39,11 @@ public class Producer {
SendResult result = producer.send(msg); SendResult result = producer.send(msg);
//发送状态 //发送状态
SendStatus status = result.getSendStatus(); SendStatus status = result.getSendStatus();
System.out.println("发送结果:" + result); System.out.println("发送结果:" + result);
//线程睡1秒 //线程睡1秒
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
} }
//6.关闭生产者producer //6.关闭生产者producer
producer.shutdown(); producer.shutdown();
} }
}
} \ No newline at end of file
...@@ -17,10 +17,8 @@ public class Consumer { ...@@ -17,10 +17,8 @@ public class Consumer {
consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.订阅主题Topic和Tag //3.订阅主题Topic和Tag
consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5")); consumer.subscribe("FilterSQLTopic", MessageSelector.bySql("i>5"));
//4.设置回调函数,处理消息 //4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容 //接受消息内容
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
...@@ -34,4 +32,4 @@ public class Consumer { ...@@ -34,4 +32,4 @@ public class Consumer {
consumer.start(); consumer.start();
System.out.println("消费者启动"); System.out.println("消费者启动");
} }
} }
\ No newline at end of file
...@@ -8,7 +8,6 @@ import org.apache.rocketmq.common.message.Message; ...@@ -8,7 +8,6 @@ import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class Producer { public class Producer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
...@@ -16,7 +15,6 @@ public class Producer { ...@@ -16,7 +15,6 @@ public class Producer {
producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.启动producer //3.启动producer
producer.start(); producer.start();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体 //4.创建消息对象,指定主题Topic、Tag和消息体
/** /**
...@@ -25,22 +23,16 @@ public class Producer { ...@@ -25,22 +23,16 @@ public class Producer {
* 参数三:消息内容 * 参数三:消息内容
*/ */
Message msg = new Message("FilterSQLTopic", "Tag1", ("Hello World" + i).getBytes()); Message msg = new Message("FilterSQLTopic", "Tag1", ("Hello World" + i).getBytes());
msg.putUserProperty("i", String.valueOf(i)); msg.putUserProperty("i", String.valueOf(i));
//5.发送消息 //5.发送消息
SendResult result = producer.send(msg); SendResult result = producer.send(msg);
//发送状态 //发送状态
SendStatus status = result.getSendStatus(); SendStatus status = result.getSendStatus();
System.out.println("发送结果:" + result); System.out.println("发送结果:" + result);
//线程睡1秒 //线程睡1秒
TimeUnit.SECONDS.sleep(2); TimeUnit.SECONDS.sleep(2);
} }
//6.关闭生产者producer //6.关闭生产者producer
producer.shutdown(); producer.shutdown();
} }
}
} \ No newline at end of file
...@@ -16,10 +16,8 @@ public class Consumer { ...@@ -16,10 +16,8 @@ public class Consumer {
consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.订阅主题Topic和Tag //3.订阅主题Topic和Tag
consumer.subscribe("FilterTagTopic", "Tag1 || Tag2 "); consumer.subscribe("FilterTagTopic", "Tag1 || Tag2 ");
//4.设置回调函数,处理消息 //4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() { consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容 //接受消息内容
@Override @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
...@@ -33,4 +31,4 @@ public class Consumer { ...@@ -33,4 +31,4 @@ public class Consumer {
consumer.start(); consumer.start();
System.out.println("消费者启动"); System.out.println("消费者启动");
} }
} }
\ No newline at end of file
...@@ -8,7 +8,6 @@ import org.apache.rocketmq.common.message.Message; ...@@ -8,7 +8,6 @@ import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class Producer { public class Producer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
...@@ -16,7 +15,6 @@ public class Producer { ...@@ -16,7 +15,6 @@ public class Producer {
producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.启动producer //3.启动producer
producer.start(); producer.start();
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体 //4.创建消息对象,指定主题Topic、Tag和消息体
/** /**
...@@ -29,15 +27,11 @@ public class Producer { ...@@ -29,15 +27,11 @@ public class Producer {
SendResult result = producer.send(msg); SendResult result = producer.send(msg);
//发送状态 //发送状态
SendStatus status = result.getSendStatus(); SendStatus status = result.getSendStatus();
System.out.println("发送结果:" + result); System.out.println("发送结果:" + result);
//线程睡1秒 //线程睡1秒
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
} }
//6.关闭生产者producer //6.关闭生产者producer
producer.shutdown(); producer.shutdown();
} }
}
} \ No newline at end of file
...@@ -9,6 +9,14 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -9,6 +9,14 @@ import org.apache.rocketmq.common.message.MessageExt;
import java.util.List; import java.util.List;
/**
* 顺序消费
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/26 14:40
*/
public class Consumer { public class Consumer {
public static void main(String[] args) throws MQClientException { public static void main(String[] args) throws MQClientException {
//1.创建消费者Consumer,制定消费者组名 //1.创建消费者Consumer,制定消费者组名
...@@ -17,10 +25,8 @@ public class Consumer { ...@@ -17,10 +25,8 @@ public class Consumer {
consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.订阅主题Topic和Tag //3.订阅主题Topic和Tag
consumer.subscribe("OrderTopic", "*"); consumer.subscribe("OrderTopic", "*");
//4.注册消息监听器 //4.注册消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() { consumer.registerMessageListener(new MessageListenerOrderly() {
@Override @Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) { for (MessageExt msg : msgs) {
...@@ -29,11 +35,8 @@ public class Consumer { ...@@ -29,11 +35,8 @@ public class Consumer {
return ConsumeOrderlyStatus.SUCCESS; return ConsumeOrderlyStatus.SUCCESS;
} }
}); });
//5.启动消费者 //5.启动消费者
consumer.start(); consumer.start();
System.out.println("消费者启动"); System.out.println("消费者启动");
} }
} }
\ No newline at end of file
...@@ -39,57 +39,46 @@ public class OrderStep { ...@@ -39,57 +39,46 @@ public class OrderStep {
// 1065L : 创建 付款 // 1065L : 创建 付款
// 7235L :创建 付款 // 7235L :创建 付款
List<OrderStep> orderList = new ArrayList<OrderStep>(); List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep(); OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(1039L); orderDemo.setOrderId(1039L);
orderDemo.setDesc("创建"); orderDemo.setDesc("创建");
orderList.add(orderDemo); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo = new OrderStep();
orderDemo.setOrderId(1065L); orderDemo.setOrderId(1065L);
orderDemo.setDesc("创建"); orderDemo.setDesc("创建");
orderList.add(orderDemo); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo = new OrderStep();
orderDemo.setOrderId(1039L); orderDemo.setOrderId(1039L);
orderDemo.setDesc("付款"); orderDemo.setDesc("付款");
orderList.add(orderDemo); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo = new OrderStep();
orderDemo.setOrderId(7235L); orderDemo.setOrderId(7235L);
orderDemo.setDesc("创建"); orderDemo.setDesc("创建");
orderList.add(orderDemo); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo = new OrderStep();
orderDemo.setOrderId(1065L); orderDemo.setOrderId(1065L);
orderDemo.setDesc("付款"); orderDemo.setDesc("付款");
orderList.add(orderDemo); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo = new OrderStep();
orderDemo.setOrderId(7235L); orderDemo.setOrderId(7235L);
orderDemo.setDesc("付款"); orderDemo.setDesc("付款");
orderList.add(orderDemo); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo = new OrderStep();
orderDemo.setOrderId(1065L); orderDemo.setOrderId(1065L);
orderDemo.setDesc("完成"); orderDemo.setDesc("完成");
orderList.add(orderDemo); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo = new OrderStep();
orderDemo.setOrderId(1039L); orderDemo.setOrderId(1039L);
orderDemo.setDesc("推送"); orderDemo.setDesc("推送");
orderList.add(orderDemo); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo = new OrderStep();
orderDemo.setOrderId(7235L); orderDemo.setOrderId(7235L);
orderDemo.setDesc("完成"); orderDemo.setDesc("完成");
orderList.add(orderDemo); orderList.add(orderDemo);
orderDemo = new OrderStep(); orderDemo = new OrderStep();
orderDemo.setOrderId(1039L); orderDemo.setOrderId(1039L);
orderDemo.setDesc("完成"); orderDemo.setDesc("完成");
orderList.add(orderDemo); orderList.add(orderDemo);
return orderList; return orderList;
} }
} }
\ No newline at end of file
...@@ -8,8 +8,14 @@ import org.apache.rocketmq.common.message.MessageQueue; ...@@ -8,8 +8,14 @@ import org.apache.rocketmq.common.message.MessageQueue;
import java.util.List; import java.util.List;
/**
* 顺序消息生产者
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/26 14:42
*/
public class Producer { public class Producer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
...@@ -43,10 +49,8 @@ public class Producer { ...@@ -43,10 +49,8 @@ public class Producer {
return mqs.get((int) index); return mqs.get((int) index);
} }
}, orderSteps.get(i).getOrderId()); }, orderSteps.get(i).getOrderId());
System.out.println("发送结果:" + sendResult); System.out.println("发送结果:" + sendResult);
} }
producer.shutdown(); producer.shutdown();
} }
}
} \ No newline at end of file
package transaction; package transaction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/** /**
* 消息的接受者 * 消息的接受者
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/26 14:57
*/ */
public class Consumer { public class Consumer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名 //1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1"); DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
...@@ -20,22 +20,16 @@ public class Consumer { ...@@ -20,22 +20,16 @@ public class Consumer {
consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); consumer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//3.订阅主题Topic和Tag //3.订阅主题Topic和Tag
consumer.subscribe("TransactionTopic", "*"); consumer.subscribe("TransactionTopic", "*");
//4.设置回调函数,处理消息 //4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() { //接受消息内容
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//接受消息内容 for (MessageExt msg : msgs) {
@Override System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} }
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}); });
//5.启动消费者consumer //5.启动消费者consumer
consumer.start(); consumer.start();
System.out.println("生产者启动"); System.out.println("生产者启动");
} }
} }
\ No newline at end of file
...@@ -8,16 +8,18 @@ import org.apache.rocketmq.common.message.MessageExt; ...@@ -8,16 +8,18 @@ import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* 发送同步消息 * 发送事务消息
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/26 14:57
*/ */
public class Producer { public class Producer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
TransactionMQProducer producer = new TransactionMQProducer("group5"); TransactionMQProducer producer = new TransactionMQProducer("group5");
//2.指定Nameserver地址 //2.指定Nameserver地址
producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876"); producer.setNamesrvAddr("47.119.161.70:9876;47.119.163.226:9876");
//添加事务监听器 //添加事务监听器
producer.setTransactionListener(new TransactionListener() { producer.setTransactionListener(new TransactionListener() {
/** /**
...@@ -49,12 +51,9 @@ public class Producer { ...@@ -49,12 +51,9 @@ public class Producer {
return LocalTransactionState.COMMIT_MESSAGE; return LocalTransactionState.COMMIT_MESSAGE;
} }
}); });
//3.启动producer //3.启动producer
producer.start(); producer.start();
String[] tags = {"TAGA", "TAGB", "TAGC"}; String[] tags = {"TAGA", "TAGB", "TAGC"};
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体 //4.创建消息对象,指定主题Topic、Tag和消息体
/** /**
...@@ -67,14 +66,11 @@ public class Producer { ...@@ -67,14 +66,11 @@ public class Producer {
SendResult result = producer.sendMessageInTransaction(msg, null); SendResult result = producer.sendMessageInTransaction(msg, null);
//发送状态 //发送状态
SendStatus status = result.getSendStatus(); SendStatus status = result.getSendStatus();
System.out.println("发送结果:" + result); System.out.println("发送结果:" + result);
//线程睡1秒 //线程睡1秒
TimeUnit.SECONDS.sleep(2); TimeUnit.SECONDS.sleep(2);
} }
//6.关闭生产者producer //6.关闭生产者producer
//producer.shutdown(); //producer.shutdown();
} }
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册