“7e73c929a236b55bd474f414b14194cd950cdc9f”上不存在“testsuites/unittest/security/capability/config.gni”
提交 264aa2d9 编写于 作者: 檀越@新空间's avatar 檀越@新空间 🐭

fix:最基础的发消息

上级 04124864
package base.consumer; package base.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/** /**
* 消息的接受者 * 消息的接受者
* *
...@@ -28,15 +25,12 @@ public class Consumer { ...@@ -28,15 +25,12 @@ public class Consumer {
//设定消费模式:负载均衡|广播模式 //设定消费模式:负载均衡|广播模式
consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setMessageModel(MessageModel.BROADCASTING);
//4.设置回调函数,处理消息 //4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接受消息内容 //接受消息内容
@Override consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) { for (MessageExt msg : msgs) {
System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody())); System.out.println("consumeThread=" + Thread.currentThread().getName() + "," + new String(msg.getBody()));
} }
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}); });
//5.启动消费者consumer //5.启动消费者consumer
consumer.start(); consumer.start();
......
...@@ -7,8 +7,13 @@ import org.apache.rocketmq.common.message.Message; ...@@ -7,8 +7,13 @@ import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* 发送异步消息 * 发送异步消息
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/26 14:17
*/ */
public class AsyncProducer { public class AsyncProducer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
......
package base.producer; package base.producer;
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.Message;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* 发送单向消息 * 发送单向消息
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/26 14:18
*/ */
public class OneWayProducer { public class OneWayProducer {
public static void main(String[] args) throws Exception, MQBrokerException { public static void main(String[] args) throws Exception, MQBrokerException {
......
...@@ -9,9 +9,12 @@ import java.util.concurrent.TimeUnit; ...@@ -9,9 +9,12 @@ import java.util.concurrent.TimeUnit;
/** /**
* 发送同步消息 * 发送同步消息
*
* @author : qinyingjie
* @version : 2.2.0
* @date : 2023/5/26 14:18
*/ */
public class SyncProducer { public class SyncProducer {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名 //1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1"); DefaultMQProducer producer = new DefaultMQProducer("group1");
...@@ -20,7 +23,6 @@ public class SyncProducer { ...@@ -20,7 +23,6 @@ public class SyncProducer {
producer.setSendMessageWithVIPChannel(false); producer.setSendMessageWithVIPChannel(false);
//3.启动producer //3.启动producer
producer.start(); producer.start();
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体 //4.创建消息对象,指定主题Topic、Tag和消息体
/** /**
...@@ -33,13 +35,10 @@ public class SyncProducer { ...@@ -33,13 +35,10 @@ public class SyncProducer {
SendResult result = producer.send(msg); SendResult result = producer.send(msg);
//发送状态 //发送状态
SendStatus status = result.getSendStatus(); SendStatus status = result.getSendStatus();
System.out.println("发送结果:" + result); System.out.println("发送结果:" + result);
//线程睡1秒 //线程睡1秒
TimeUnit.SECONDS.sleep(1); TimeUnit.SECONDS.sleep(1);
} }
//6.关闭生产者producer //6.关闭生产者producer
producer.shutdown(); producer.shutdown();
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册